26.2 异步子Agent任务
异步子Agent任务概述
异步子Agent任务是Claude Code中处理长时间运行任务的重要机制。通过异步执行,子Agent可以在后台执行任务,主Agent可以继续处理其他请求,从而提高整体效率和响应速度。
异步任务的基本概念
1. 同步 vs 异步
示例:同步 vs 异步
同步执行:
python
python
````python
```python
def process_task(task):
"""同步处理任务"""
result1 = agent1.execute(task)
result2 = agent2.execute(result1)
result3 = agent3.execute(result2)
return result3
# 执行时间: 10 + 15 + 20 = 45秒
```> 异步执行:
```
python
async def process_task_async(task):
"""异步处理任务"""
result1 = await agent1.execute_async(task)
result2 = await agent2.execute_async(result1)
result3 = await agent3.execute_async(result2)
return result3
# 执行时间: 20秒(并行执行)
### 2. 异步任务的优势
| 特性 | 同步任务 | 异步任务 |
|------|----------|----------|
| 执行方式 | 顺序执行 | 并行执行 |
| 响应时间 | 长 | 短 |
| 资源利用率 | 低 | 高 |
| 用户体验 | 差 | 好 |
| 复杂度 | 低 | 中 |
## 异步子Agent实现
### 1. 基础异步子Agent
````python
````python
# 示例:基础异步子Agent
用户请求:
"创建一个异步代码生成子Agent"
Claude Code 生成的代码:
```python
```python
import asyncio
from typing import Dict, Any
import logging
logger = logging.getLogger(__name__)
class AsyncCodeGenerationAgent:
"""异步代码生成子Agent"""
def __init__(self, context):
self.context = context
self.task_queue = asyncio.Queue()
self.results = {}
self.running = False
async def start(self):
"""启动Agent"""
self.running = True
logger.info("AsyncCodeGenerationAgent started")
# 启动任务处理器
asyncio.create_task(self._process_tasks())
async def stop(self):
"""停止Agent"""
self.running = False
logger.info("AsyncCodeGenerationAgent stopped")
async def submit_task(self, task_id: str, task: Dict[str, Any]):
"""提交任务"""
await self.task_queue.put((task_id, task))
logger.info(f"Task {task_id} submitted")
async def get_result(self, task_id: str) -> Dict[str, Any]:
"""获取任务结果"""
while task_id not in self.results:
await asyncio.sleep(0.1)
return self.results[task_id]
async def _process_tasks(self):
"""处理任务"""
while self.running:
try:
task_id, task = await asyncio.wait_for(
self.task_queue.get(),
timeout=1.0
)
logger.info(f"Processing task {task_id}")
# 处理任务
result = await self._generate_code(task)
# 保存结果
self.results[task_id] = result
logger.info(f"Task {task_id} completed")
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Error processing task: {e}")
async def _generate_code(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""生成代码"""
# 模拟长时间运行的任务
await asyncio.sleep(2)
code = f"""
# Generated code for {task['name']}
def {task['function_name']}():
pass
"""
return {
'code': code,
'status': 'completed',
'task_id': task.get('task_id')
}
# 使用示例
async def main():
"""主函数"""
agent = AsyncCodeGenerationAgent({})
# 启动Agent
await agent.start()
# 提交任务
task1 = {'name': 'Task 1', 'function_name': 'func1'}
task2 = {'name': 'Task 2', 'function_name': 'func2'}
task3 = {'name': 'Task 3', 'function_name': 'func3'}
await agent.submit_task('task1', task1)
await agent.submit_task('task2', task2)
await agent.submit_task('task3', task3)
# 获取结果
result1 = await agent.get_result('task1')
result2 = await agent.get_result('task2')
result3 = await agent.get_result('task3')
print("Task 1 result:", result1['status'])
print("Task 2 result:", result2['status'])
print("Task 3 result:", result3['status'])
# 停止Agent
await agent.stop()
if __name__ == '__main__':
asyncio.run(main())
```> **异步子Agent特点**:
> - 使用asyncio实现异步处理
> - 任务队列管理
> - 结果存储和检索
> - 后台任务处理
### 2. 高级异步子Agent
````python
`python
# 示例:高级异步子Agent
用户请求:
"创建一个支持任务优先级和超时的异步子Agent"
Claude Code 生成的代码:
```python
```python
import asyncio
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
import heapq
import logging
logger = logging.getLogger(__name__)
class AsyncPriorityAgent:
"""支持优先级的异步子Agent"""
def __init__(self, context, max_workers=4):
self.context = context
self.task_queue = []
self.task_lock = asyncio.Lock()
self.results = {}
self.running = False
self.max_workers = max_workers
self.workers = []
self.task_stats = {
'submitted': 0,
'completed': 0,
'failed': 0,
'timeout': 0
}
async def start(self):
"""启动Agent"""
self.running = True
logger.info(f"AsyncPriorityAgent started with {self.max_workers} workers")
# 启动工作线程
for i in range(self.max_workers):
worker = asyncio.create_task(self._worker(f"worker-{i}"))
self.workers.append(worker)
async def stop(self):
"""停止Agent"""
self.running = False
# 等待所有工作线程完成
await asyncio.gather(*self.workers, return_exceptions=True)
logger.info("AsyncPriorityAgent stopped")
async def submit_task(
self,
task_id: str,
task: Dict[str, Any],
priority: int = 0,
timeout: Optional[float] = None
):
"""提交任务"""
task_data = {
'task_id': task_id,
'task': task,
'priority': priority,
'timeout': timeout,
'submitted_at': datetime.utcnow(),
'status': 'pending'
}
async with self.task_lock:
heapq.heappush(self.task_queue, (-priority, task_data))
self.task_stats['submitted'] += 1
logger.info(f"Task {task_id} submitted with priority {priority}")
async def get_result(self, task_id: str, timeout: float = 30.0) -> Dict[str, Any]:
"""获取任务结果"""
start_time = datetime.utcnow()
while True:
if task_id in self.results:
return self.results[task_id]
# 检查超时
elapsed = (datetime.utcnow() - start_time).total_seconds()
if elapsed > timeout:
raise TimeoutError(f"Task {task_id} timeout after {timeout}s")
await asyncio.sleep(0.1)
async def _worker(self, worker_name: str):
"""工作线程"""
logger.info(f"{worker_name} started")
while self.running:
try:
# 获取任务
task_data = await self._get_task()
if task_data is None:
await asyncio.sleep(0.1)
continue
task_id = task_data['task_id']
task = task_data['task']
timeout = task_data.get('timeout')
logger.info(f"{worker_name} processing task {task_id}")
# 执行任务
try:
if timeout:
result = await asyncio.wait_for(
self._execute_task(task),
timeout=timeout
)
else:
result = await self._execute_task(task)
self.results[task_id] = {
'result': result,
'status': 'completed',
'worker': worker_name,
'completed_at': datetime.utcnow()
}
self.task_stats['completed'] += 1
logger.info(f"{worker_name} completed task {task_id}")
except asyncio.TimeoutError:
self.results[task_id] = {
'error': 'Task timeout',
'status': 'timeout',
'worker': worker_name
}
self.task_stats['timeout'] += 1
logger.warning(f"{worker_name} task {task_id} timeout")
except Exception as e:
self.results[task_id] = {
'error': str(e),
'status': 'failed',
'worker': worker_name
}
self.task_stats['failed'] += 1
logger.error(f"{worker_name} task {task_id} failed: {e}")
except Exception as e:
logger.error(f"{worker_name} error: {e}")
await asyncio.sleep(1)
logger.info(f"{worker_name} stopped")
async def _get_task(self) -> Optional[Dict[str, Any]]:
"""获取任务"""
async with self.task_lock:
if self.task_queue:
_, task_data = heapq.heappop(self.task_queue)
task_data['status'] = 'processing'
return task_data
return None
async def _execute_task(self, task: Dict[str, Any]) -> Any:
"""执行任务"""
task_type = task.get('type', 'default')
if task_type == 'code_generation':
return await self._generate_code(task)
elif task_type == 'code_review':
return await self._review_code(task)
elif task_type == 'test_generation':
return await self._generate_tests(task)
else:
return await self._default_task(task)
async def _generate_code(self, task: Dict[str, Any]) -> str:
"""生成代码"""
await asyncio.sleep(2)
return f"""
# Generated code for {task['name']}
def {task['function_name']}():
pass
"""
async def _review_code(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""审查代码"""
await asyncio.sleep(1.5)
return {
'issues': [],
'suggestions': [],
'metrics': {}
}
async def _generate_tests(self, task: Dict[str, Any]) -> str:
"""生成测试"""
await asyncio.sleep(1)
return """
import unittest
class TestGeneratedCode(unittest.TestCase):
pass
"""
async def _default_task(self, task: Dict[str, Any]) -> Any:
"""默认任务"""
await asyncio.sleep(1)
return {'result': 'completed'}
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
'stats': self.task_stats.copy(),
'queue_size': len(self.task_queue),
'active_workers': len(self.workers)
}
# 使用示例
async def main():
"""主函数"""
agent = AsyncPriorityAgent({}, max_workers=4)
# 启动Agent
await agent.start()
# 提交不同优先级的任务
await agent.submit_task('task1', {'name': 'Low Priority', 'type': 'code_generation'}, priority=1)
await agent.submit_task('task2', {'name': 'High Priority', 'type': 'code_review'}, priority=10)
await agent.submit_task('task3', {'name': 'Medium Priority', 'type': 'test_generation'}, priority=5)
await agent.submit_task('task4', {'name': 'Timeout Task', 'type': 'code_generation'}, priority=8, timeout=1.0)
# 获取结果
try:
result1 = await agent.get_result('task1')
print("Task 1 result:", result1['status'])
except TimeoutError as e:
print(f"Task 1 error: {e}")
try:
result2 = await agent.get_result('task2')
print("Task 2 result:", result2['status'])
except TimeoutError as e:
print(f"Task 2 error: {e}")
try:
result3 = await agent.get_result('task3')
print("Task 3 result:", result3['status'])
except TimeoutError as e:
print(f"Task 3 error: {e}")
try:
result4 = await agent.get_result('task4')
print("Task 4 result:", result4['status'])
except TimeoutError as e:
print(f"Task 4 error: {e}")
# 获取统计信息
stats = agent.get_stats()
print("\nStatistics:")
print(f" Submitted: {stats['stats']['submitted']}")
print(f" Completed: {stats['stats']['completed']}")
print(f" Failed: {stats['stats']['failed']}")
print(f" Timeout: {stats['stats']['timeout']}")
print(f" Queue size: {stats['queue_size']}")
# 停止Agent
await agent.stop()
if __name__ == '__main__':
asyncio.run(main())
```> **高级异步子Agent特点**:
> - 支持任务优先级
> - 支持任务超时
> - 多工作线程
> - 任务统计
> - 错误处理
## 异步任务模式
### 1. 任务编排模式
````python
`python
# 示例:任务编排
用户请求:
"实现一个任务编排器,协调多个异步子Agent"
Claude Code 生成的代码:
```python
```python
import asyncio
from typing import Dict, Any, List
import logging
logger = logging.getLogger(__name__)
class TaskOrchestrator:
"""任务编排器"""
def __init__(self, context):
self.context = context
self.agents = {}
self.task_graph = {}
self.results = {}
def register_agent(self, name: str, agent):
"""注册Agent"""
self.agents[name] = agent
logger.info(f"Agent {name} registered")
def define_workflow(self, workflow: Dict[str, Any]):
"""定义工作流"""
self.task_graph = workflow
logger.info("Workflow defined")
async def execute_workflow(self) -> Dict[str, Any]:
"""执行工作流"""
logger.info("Starting workflow execution")
# 执行任务
for task in self.task_graph['tasks']:
await self._execute_task(task)
logger.info("Workflow execution completed")
return self.results
async def _execute_task(self, task: Dict[str, Any]):
"""执行任务"""
task_id = task['id']
task_type = task['type']
agent_name = task['agent']
dependencies = task.get('dependencies', [])
logger.info(f"Executing task {task_id}")
# 等待依赖任务完成
for dep_id in dependencies:
await self._wait_for_task(dep_id)
# 获取Agent
agent = self.agents.get(agent_name)
if not agent:
raise ValueError(f"Agent {agent_name} not found")
# 执行任务
if task_type == 'code_generation':
result = await agent._generate_code(task['params'])
elif task_type == 'code_review':
result = await agent._review_code(task['params'])
elif task_type == 'test_generation':
result = await agent._generate_tests(task['params'])
else:
result = await agent._default_task(task['params'])
# 保存结果
self.results[task_id] = result
logger.info(f"Task {task_id} completed")
async def _wait_for_task(self, task_id: str):
"""等待任务完成"""
while task_id not in self.results:
await asyncio.sleep(0.1)
# 使用示例
async def main():
"""主函数"""
orchestrator = TaskOrchestrator({})
# 注册Agent
code_agent = AsyncCodeGenerationAgent({})
review_agent = AsyncCodeReviewAgent({})
test_agent = AsyncTestGenerationAgent({})
await code_agent.start()
await review_agent.start()
await test_agent.start()
orchestrator.register_agent('code', code_agent)
orchestrator.register_agent('review', review_agent)
orchestrator.register_agent('test', test_agent)
# 定义工作流
workflow = {
'tasks': [
{
'id': 'task1',
'type': 'code_generation',
'agent': 'code',
'params': {'name': 'User Service', 'function_name': 'create_user'}
},
{
'id': 'task2',
'type': 'code_generation',
'agent': 'code',
'params': {'name': 'Product Service', 'function_name': 'create_product'}
},
{
'id': 'task3',
'type': 'code_review',
'agent': 'review',
'params': {'code': 'result from task1'},
'dependencies': ['task1']
},
{
'id': 'task4',
'type': 'code_review',
'agent': 'review',
'params': {'code': 'result from task2'},
'dependencies': ['task2']
},
{
'id': 'task5',
'type': 'test_generation',
'agent': 'test',
'params': {'code': 'result from task1'},
'dependencies': ['task1', 'task3']
},
{
'id': 'task6',
'type': 'test_generation',
'agent': 'test',
'params': {'code': 'result from task2'},
'dependencies': ['task2', 'task4']
}
]
}
orchestrator.define_workflow(workflow)
# 执行工作流
results = await orchestrator.execute_workflow()
print("Workflow results:")
for task_id, result in results.items():
print(f" {task_id}: {result.get('status', 'unknown')}")
if __name__ == '__main__':
asyncio.run(main())
```> **任务编排特点**:
> - 定义任务依赖关系
> - 自动处理任务执行顺序
> - 支持并行执行
> - 结果收集和传递
### 2. 任务分发模式
````python
`python
# 示例:任务分发
用户请求:
"实现一个任务分发器,将任务分发给多个异步子Agent"
Claude Code 生成的代码:
```python
```python
import asyncio
from typing import Dict, Any, List
import random
import logging
logger = logging.getLogger(__name__)
class TaskDispatcher:
"""任务分发器"""
def __init__(self, context):
self.context = context
self.agents = []
self.task_queue = asyncio.Queue()
self.running = False
def register_agent(self, agent):
"""注册Agent"""
self.agents.append(agent)
logger.info(f"Agent registered, total: {len(self.agents)}")
async def start(self):
"""启动分发器"""
self.running = True
logger.info("TaskDispatcher started")
# 启动分发线程
asyncio.create_task(self._dispatch_tasks())
async def stop(self):
"""停止分发器"""
self.running = False
logger.info("TaskDispatcher stopped")
async def submit_task(self, task: Dict[str, Any]):
"""提交任务"""
await self.task_queue.put(task)
logger.info(f"Task submitted: {task.get('id', 'unknown')}")
async def _dispatch_tasks(self):
"""分发任务"""
while self.running:
try:
# 获取任务
task = await asyncio.wait_for(
self.task_queue.get(),
timeout=1.0
)
# 选择Agent
agent = self._select_agent(task)
if agent:
# 提交任务给Agent
await agent.submit_task(task['id'], task)
logger.info(f"Task {task['id']} dispatched to {agent.__class__.__name__}")
else:
logger.warning(f"No available agent for task {task['id']}")
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Error dispatching task: {e}")
def _select_agent(self, task: Dict[str, Any]) -> Any:
"""选择Agent"""
if not self.agents:
return None
# 根据任务类型选择Agent
task_type = task.get('type', 'default')
for agent in self.agents:
if hasattr(agent, 'can_handle') and agent.can_handle(task_type):
return agent
# 随机选择一个Agent
return random.choice(self.agents)
class LoadBalancedDispatcher(TaskDispatcher):
"""负载均衡分发器"""
def __init__(self, context):
super().__init__(context)
self.agent_loads = {}
def register_agent(self, agent):
"""注册Agent"""
super().register_agent(agent)
self.agent_loads[agent] = 0
async def _dispatch_tasks(self):
"""分发任务(负载均衡)"""
while self.running:
try:
# 获取任务
task = await asyncio.wait_for(
self.task_queue.get(),
timeout=1.0
)
# 选择负载最低的Agent
agent = self._select_least_loaded_agent(task)
if agent:
# 提交任务给Agent
await agent.submit_task(task['id'], task)
self.agent_loads[agent] += 1
logger.info(f"Task {task['id']} dispatched to {agent.__class__.__name__} (load: {self.agent_loads[agent]})")
else:
logger.warning(f"No available agent for task {task['id']}")
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Error dispatching task: {e}")
def _select_least_loaded_agent(self, task: Dict[str, Any]) -> Any:
"""选择负载最低的Agent"""
if not self.agents:
return None
# 找到负载最低的Agent
min_load = float('inf')
selected_agent = None
for agent in self.agents:
if self.agent_loads[agent] < min_load:
min_load = self.agent_loads[agent]
selected_agent = agent
return selected_agent
def update_agent_load(self, agent, delta: int):
"""更新Agent负载"""
if agent in self.agent_loads:
self.agent_loads[agent] += delta
logger.info(f"Agent {agent.__class__.__name__} load updated to {self.agent_loads[agent]}")
# 使用示例
async def main():
"""主函数"""
dispatcher = LoadBalancedDispatcher({})
# 注册Agent
agent1 = AsyncCodeGenerationAgent({})
agent2 = AsyncCodeGenerationAgent({})
agent3 = AsyncCodeGenerationAgent({})
await agent1.start()
await agent2.start()
await agent3.start()
dispatcher.register_agent(agent1)
dispatcher.register_agent(agent2)
dispatcher.register_agent(agent3)
# 启动分发器
await dispatcher.start()
# 提交多个任务
for i in range(10):
await dispatcher.submit_task({
'id': f'task{i}',
'type': 'code_generation',
'name': f'Task {i}',
'function_name': f'func{i}'
})
# 等待任务完成
await asyncio.sleep(5)
# 停止分发器
await dispatcher.stop()
# 停止Agent
await agent1.stop()
await agent2.stop()
await agent3.stop()
if __name__ == '__main__':
asyncio.run(main())
```> **任务分发特点**:
> - 负载均衡
> - 动态Agent选择
> - 负载监控
> - 任务路由
## 异步任务监控
### 1. 任务状态监控
````python
`python
# 示例:任务状态监控
用户请求:
"实现一个任务状态监控系统"
Claude Code 生成的代码:
```python
```python
import asyncio
from typing import Dict, Any
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class TaskMonitor:
"""任务监控器"""
def __init__(self):
self.tasks = {}
self.events = []
def register_task(self, task_id: str, task_info: Dict[str, Any]):
"""注册任务"""
self.tasks[task_id] = {
'id': task_id,
'status': 'pending',
'created_at': datetime.utcnow(),
'updated_at': datetime.utcnow(),
**task_info
}
self._log_event(task_id, 'registered')
logger.info(f"Task {task_id} registered")
def update_task_status(self, task_id: str, status: str, **kwargs):
"""更新任务状态"""
if task_id not in self.tasks:
logger.warning(f"Task {task_id} not found")
return
self.tasks[task_id]['status'] = status
self.tasks[task_id]['updated_at'] = datetime.utcnow()
self.tasks[task_id].update(kwargs)
self._log_event(task_id, f'status_changed_to_{status}')
logger.info(f"Task {task_id} status updated to {status}")
def get_task_status(self, task_id: str) -> Dict[str, Any]:
"""获取任务状态"""
return self.tasks.get(task_id, {})
def get_all_tasks(self) -> Dict[str, Any]:
"""获取所有任务"""
return self.tasks
def get_tasks_by_status(self, status: str) -> Dict[str, Any]:
"""根据状态获取任务"""
return {
task_id: task
for task_id, task in self.tasks.items()
if task['status'] == status
}
def get_task_statistics(self) -> Dict[str, Any]:
"""获取任务统计"""
stats = {
'total': len(self.tasks),
'pending': 0,
'processing': 0,
'completed': 0,
'failed': 0,
'timeout': 0
}
for task in self.tasks.values():
status = task['status']
if status in stats:
stats[status] += 1
return stats
def _log_event(self, task_id: str, event: str):
"""记录事件"""
self.events.append({
'task_id': task_id,
'event': event,
'timestamp': datetime.utcnow()
})
def get_task_events(self, task_id: str) -> list:
"""获取任务事件"""
return [
event for event in self.events
if event['task_id'] == task_id
]
# 使用示例
async def main():
"""主函数"""
monitor = TaskMonitor()
# 注册任务
monitor.register_task('task1', {'name': 'Task 1', 'type': 'code_generation'})
monitor.register_task('task2', {'name': 'Task 2', 'type': 'code_review'})
monitor.register_task('task3', {'name': 'Task 3', 'type': 'test_generation'})
# 更新任务状态
monitor.update_task_status('task1', 'processing')
await asyncio.sleep(1)
monitor.update_task_status('task1', 'completed', result='success')
monitor.update_task_status('task2', 'processing')
await asyncio.sleep(1)
monitor.update_task_status('task2', 'failed', error='validation error')
monitor.update_task_status('task3', 'processing')
await asyncio.sleep(1)
monitor.update_task_status('task3', 'timeout')
# 获取任务状态
print("Task 1 status:", monitor.get_task_status('task1'))
print("Task 2 status:", monitor.get_task_status('task2'))
print("Task 3 status:", monitor.get_task_status('task3'))
# 获取任务统计
stats = monitor.get_task_statistics()
print("\nTask Statistics:")
for key, value in stats.items():
print(f" {key}: {value}")
# 获取任务事件
print("\nTask 1 events:")
for event in monitor.get_task_events('task1'):
print(f" {event['timestamp']}: {event['event']}")
if __name__ == '__main__':
asyncio.run(main())
**任务监控特点**:
- 实时状态跟踪
- 任务统计
- 事件记录
- 状态查询
```