Skip to content

26.2 非同步子Agent任務

非同步子Agent任務概述

非同步子Agent任務是Claude Code中處理長時間執行任務的重要機制。透過非同步執行,子Agent可以在後臺執行任務,主Agent可以繼續處理其他請求,從而提高整體效率和響應速度。

异步任务的基本概念

1. 同步 vs 异步

示例:同步 vs 异步

同步執行:

python
    python


    ````python

    ````python

    # 示例:基础异步子Agent

    用户请求:
    "创建一个异步代码生成子Agent"

    Claude Code 生成的代码:

    ```python

    ```python

    import asyncio
    from typing import Dict, Any
    import logging

    logger = logging.getLogger(__name__)

    class AsyncCodeGenerationAgent:
        """异步代码生成子Agent"""

        def __init__(self, context):
            self.context = context
            self.task_queue = asyncio.Queue()
            self.results = {}
            self.running = False

        async def start(self):
            """启动Agent"""
            self.running = True
            logger.info("AsyncCodeGenerationAgent started")

            # 启动任务处理器
            asyncio.create_task(self._process_tasks())

        async def stop(self):
            """停止Agent"""
            self.running = False
            logger.info("AsyncCodeGenerationAgent stopped")

        async def submit_task(self, task_id: str, task: Dict[str, Any]):
            """提交任务"""
            await self.task_queue.put((task_id, task))
            logger.info(f"Task {task_id} submitted")

        async def get_result(self, task_id: str) -> Dict[str, Any]:
            """获取任务结果"""
            while task_id not in self.results:
                await asyncio.sleep(0.1)

            return self.results[task_id]

        async def _process_tasks(self):
            """处理任务"""
            while self.running:
                try:
                    task_id, task = await asyncio.wait_for(
                        self.task_queue.get(),
                        timeout=1.0
                    )

                    logger.info(f"Processing task {task_id}")

                    # 处理任务
                    result = await self._generate_code(task)

                    # 保存结果
                    self.results[task_id] = result

                    logger.info(f"Task {task_id} completed")

                except asyncio.TimeoutError:
                    continue
                except Exception as e:
                    logger.error(f"Error processing task: {e}")

        async def _generate_code(self, task: Dict[str, Any]) -> Dict[str, Any]:
            """生成代码"""
            # 模拟长时间运行的任务
            await asyncio.sleep(2)

            code = f"""
    # Generated code for {task['name']}
    def {task['function_name']}():
        pass
    """

            return {
                'code': code,
                'status': 'completed',
                'task_id': task.get('task_id')
            }

    # 使用示例
    async def main():
        """主函数"""
        agent = AsyncCodeGenerationAgent({})

        # 启动Agent
        await agent.start()

        # 提交任务
        task1 = {'name': 'Task 1', 'function_name': 'func1'}
        task2 = {'name': 'Task 2', 'function_name': 'func2'}
        task3 = {'name': 'Task 3', 'function_name': 'func3'}

        await agent.submit_task('task1', task1)
        await agent.submit_task('task2', task2)
        await agent.submit_task('task3', task3)

        # 获取结果
        result1 = await agent.get_result('task1')
        result2 = await agent.get_result('task2')
        result3 = await agent.get_result('task3')

        print("Task 1 result:", result1['status'])
        print("Task 2 result:", result2['status'])
        print("Task 3 result:", result3['status'])

        # 停止Agent
        await agent.stop()

    if __name__ == '__main__':
        asyncio.run(main())

    ```> **非同步子Agent特點**:

    > - 使用asyncio實現非同步處理
    > - 任務佇列管理
    > - 結果儲存和檢索
    > - 後臺任務處理

    ### 2. 高階非同步子Agent

    ````python
    `python

    # 示例:高级异步子Agent

    用户请求:
    "创建一个支持任务优先级和超时的异步子Agent"

    Claude Code 生成的代码:

    ```python

    ```python

    import asyncio
    from typing import Dict, Any, Optional
    from datetime import datetime, timedelta
    import heapq
    import logging

    logger = logging.getLogger(__name__)

    class AsyncPriorityAgent:
        """支持优先级的异步子Agent"""

        def __init__(self, context, max_workers=4):
            self.context = context
            self.task_queue = []
            self.task_lock = asyncio.Lock()
            self.results = {}
            self.running = False
            self.max_workers = max_workers
            self.workers = []
            self.task_stats = {
                'submitted': 0,
                'completed': 0,
                'failed': 0,
                'timeout': 0
            }

        async def start(self):
            """启动Agent"""
            self.running = True
            logger.info(f"AsyncPriorityAgent started with {self.max_workers} workers")

            # 启动工作线程
            for i in range(self.max_workers):
                worker = asyncio.create_task(self._worker(f"worker-{i}"))
                self.workers.append(worker)

        async def stop(self):
            """停止Agent"""
            self.running = False

            # 等待所有工作线程完成
            await asyncio.gather(*self.workers, return_exceptions=True)

            logger.info("AsyncPriorityAgent stopped")

        async def submit_task(
            self,
            task_id: str,
            task: Dict[str, Any],
            priority: int = 0,
            timeout: Optional[float] = None
        ):
            """提交任务"""
            task_data = {
                'task_id': task_id,
                'task': task,
                'priority': priority,
                'timeout': timeout,
                'submitted_at': datetime.utcnow(),
                'status': 'pending'
            }

            async with self.task_lock:
                heapq.heappush(self.task_queue, (-priority, task_data))

            self.task_stats['submitted'] += 1
            logger.info(f"Task {task_id} submitted with priority {priority}")

        async def get_result(self, task_id: str, timeout: float = 30.0) -> Dict[str, Any]:
            """获取任务结果"""
            start_time = datetime.utcnow()

            while True:
                if task_id in self.results:
                    return self.results[task_id]

                # 检查超时
                elapsed = (datetime.utcnow() - start_time).total_seconds()
                if elapsed > timeout:
                    raise TimeoutError(f"Task {task_id} timeout after {timeout}s")

                await asyncio.sleep(0.1)

        async def _worker(self, worker_name: str):
            """工作线程"""
            logger.info(f"{worker_name} started")

            while self.running:
                try:
                    # 获取任务
                    task_data = await self._get_task()

                    if task_data is None:
                        await asyncio.sleep(0.1)
                        continue

                    task_id = task_data['task_id']
                    task = task_data['task']
                    timeout = task_data.get('timeout')

                    logger.info(f"{worker_name} processing task {task_id}")

                    # 执行任务
                    try:
                        if timeout:
                            result = await asyncio.wait_for(
                                self._execute_task(task),
                                timeout=timeout
                            )
                        else:
                            result = await self._execute_task(task)

                        self.results[task_id] = {
                            'result': result,
                            'status': 'completed',
                            'worker': worker_name,
                            'completed_at': datetime.utcnow()
                        }

                        self.task_stats['completed'] += 1
                        logger.info(f"{worker_name} completed task {task_id}")

                    except asyncio.TimeoutError:
                        self.results[task_id] = {
                            'error': 'Task timeout',
                            'status': 'timeout',
                            'worker': worker_name
                        }

                        self.task_stats['timeout'] += 1
                        logger.warning(f"{worker_name} task {task_id} timeout")

                    except Exception as e:
                        self.results[task_id] = {
                            'error': str(e),
                            'status': 'failed',
                            'worker': worker_name
                        }

                        self.task_stats['failed'] += 1
                        logger.error(f"{worker_name} task {task_id} failed: {e}")

                except Exception as e:
                    logger.error(f"{worker_name} error: {e}")
                    await asyncio.sleep(1)

            logger.info(f"{worker_name} stopped")

        async def _get_task(self) -> Optional[Dict[str, Any]]:
            """获取任务"""
            async with self.task_lock:
                if self.task_queue:
                    _, task_data = heapq.heappop(self.task_queue)
                    task_data['status'] = 'processing'
                    return task_data

            return None

        async def _execute_task(self, task: Dict[str, Any]) -> Any:
            """执行任务"""
            task_type = task.get('type', 'default')

            if task_type == 'code_generation':
                return await self._generate_code(task)
            elif task_type == 'code_review':
                return await self._review_code(task)
            elif task_type == 'test_generation':
                return await self._generate_tests(task)
            else:
                return await self._default_task(task)

        async def _generate_code(self, task: Dict[str, Any]) -> str:
            """生成代码"""
            await asyncio.sleep(2)

            return f"""
    # Generated code for {task['name']}
    def {task['function_name']}():
        pass
    """

        async def _review_code(self, task: Dict[str, Any]) -> Dict[str, Any]:
            """审查代码"""
            await asyncio.sleep(1.5)

            return {
                'issues': [],
                'suggestions': [],
                'metrics': {}
            }

        async def _generate_tests(self, task: Dict[str, Any]) -> str:
            """生成测试"""
            await asyncio.sleep(1)

            return """
    import unittest

    class TestGeneratedCode(unittest.TestCase):
        pass
    """

        async def _default_task(self, task: Dict[str, Any]) -> Any:
            """默认任务"""
            await asyncio.sleep(1)
            return {'result': 'completed'}

        def get_stats(self) -> Dict[str, Any]:
            """获取统计信息"""
            return {
                'stats': self.task_stats.copy(),
                'queue_size': len(self.task_queue),
                'active_workers': len(self.workers)
            }

    # 使用示例
    async def main():
        """主函数"""
        agent = AsyncPriorityAgent({}, max_workers=4)

        # 启动Agent
        await agent.start()

        # 提交不同优先级的任务
        await agent.submit_task('task1', {'name': 'Low Priority', 'type': 'code_generation'}, priority=1)
        await agent.submit_task('task2', {'name': 'High Priority', 'type': 'code_review'}, priority=10)
        await agent.submit_task('task3', {'name': 'Medium Priority', 'type': 'test_generation'}, priority=5)
        await agent.submit_task('task4', {'name': 'Timeout Task', 'type': 'code_generation'}, priority=8, timeout=1.0)

        # 获取结果
        try:
            result1 = await agent.get_result('task1')
            print("Task 1 result:", result1['status'])
        except TimeoutError as e:
            print(f"Task 1 error: {e}")

        try:
            result2 = await agent.get_result('task2')
            print("Task 2 result:", result2['status'])
        except TimeoutError as e:
            print(f"Task 2 error: {e}")

        try:
            result3 = await agent.get_result('task3')
            print("Task 3 result:", result3['status'])
        except TimeoutError as e:
            print(f"Task 3 error: {e}")

        try:
            result4 = await agent.get_result('task4')
            print("Task 4 result:", result4['status'])
        except TimeoutError as e:
            print(f"Task 4 error: {e}")

        # 获取统计信息
        stats = agent.get_stats()
        print("\nStatistics:")
        print(f"  Submitted: {stats['stats']['submitted']}")
        print(f"  Completed: {stats['stats']['completed']}")
        print(f"  Failed: {stats['stats']['failed']}")
        print(f"  Timeout: {stats['stats']['timeout']}")
        print(f"  Queue size: {stats['queue_size']}")

        # 停止Agent
        await agent.stop()

    if __name__ == '__main__':
        asyncio.run(main())

    ```> **高階非同步子Agent特點**:

    > - 支援任務優先順序
    > - 支援任務超時
    > - 多工作執行緒
    > - 任務統計
    > - 錯誤處理

    ## 非同步任務模式

    ### 1. 任務編排模式

    ````python
    `python

    # 示例:任务编排

    用户请求:
    "实现一个任务编排器,协调多个异步子Agent"

    Claude Code 生成的代码:

    ```python

    ```python

    import asyncio
    from typing import Dict, Any, List
    import logging

    logger = logging.getLogger(__name__)

    class TaskOrchestrator:
        """任务编排器"""

        def __init__(self, context):
            self.context = context
            self.agents = {}
            self.task_graph = {}
            self.results = {}

        def register_agent(self, name: str, agent):
            """注册Agent"""
            self.agents[name] = agent
            logger.info(f"Agent {name} registered")

        def define_workflow(self, workflow: Dict[str, Any]):
            """定义工作流"""
            self.task_graph = workflow
            logger.info("Workflow defined")

        async def execute_workflow(self) -> Dict[str, Any]:
            """执行工作流"""
            logger.info("Starting workflow execution")

            # 执行任务
            for task in self.task_graph['tasks']:
                await self._execute_task(task)

            logger.info("Workflow execution completed")

            return self.results

        async def _execute_task(self, task: Dict[str, Any]):
            """执行任务"""
            task_id = task['id']
            task_type = task['type']
            agent_name = task['agent']
            dependencies = task.get('dependencies', [])

            logger.info(f"Executing task {task_id}")

            # 等待依赖任务完成
            for dep_id in dependencies:
                await self._wait_for_task(dep_id)

            # 获取Agent
            agent = self.agents.get(agent_name)
            if not agent:
                raise ValueError(f"Agent {agent_name} not found")

            # 执行任务
            if task_type == 'code_generation':
                result = await agent._generate_code(task['params'])
            elif task_type == 'code_review':
                result = await agent._review_code(task['params'])
            elif task_type == 'test_generation':
                result = await agent._generate_tests(task['params'])
            else:
                result = await agent._default_task(task['params'])

            # 保存结果
            self.results[task_id] = result

            logger.info(f"Task {task_id} completed")

        async def _wait_for_task(self, task_id: str):
            """等待任务完成"""
            while task_id not in self.results:
                await asyncio.sleep(0.1)

    # 使用示例
    async def main():
        """主函数"""
        orchestrator = TaskOrchestrator({})

        # 注册Agent
        code_agent = AsyncCodeGenerationAgent({})
        review_agent = AsyncCodeReviewAgent({})
        test_agent = AsyncTestGenerationAgent({})

        await code_agent.start()
        await review_agent.start()
        await test_agent.start()

        orchestrator.register_agent('code', code_agent)
        orchestrator.register_agent('review', review_agent)
        orchestrator.register_agent('test', test_agent)

        # 定义工作流
        workflow = {
            'tasks': [
                {
                    'id': 'task1',
                    'type': 'code_generation',
                    'agent': 'code',
                    'params': {'name': 'User Service', 'function_name': 'create_user'}
                },
                {
                    'id': 'task2',
                    'type': 'code_generation',
                    'agent': 'code',
                    'params': {'name': 'Product Service', 'function_name': 'create_product'}
                },
                {
                    'id': 'task3',
                    'type': 'code_review',
                    'agent': 'review',
                    'params': {'code': 'result from task1'},
                    'dependencies': ['task1']
                },
                {
                    'id': 'task4',
                    'type': 'code_review',
                    'agent': 'review',
                    'params': {'code': 'result from task2'},
                    'dependencies': ['task2']
                },
                {
                    'id': 'task5',
                    'type': 'test_generation',
                    'agent': 'test',
                    'params': {'code': 'result from task1'},
                    'dependencies': ['task1', 'task3']
                },
                {
                    'id': 'task6',
                    'type': 'test_generation',
                    'agent': 'test',
                    'params': {'code': 'result from task2'},
                    'dependencies': ['task2', 'task4']
                }
            ]
        }

        orchestrator.define_workflow(workflow)

        # 执行工作流
        results = await orchestrator.execute_workflow()

        print("Workflow results:")
        for task_id, result in results.items():
            print(f"  {task_id}: {result.get('status', 'unknown')}")

    if __name__ == '__main__':
        asyncio.run(main())

    ```> **任務編排特點**:

    > - 定義任務依賴關係
    > - 自動處理任務執行順序
    > - 支援並行執行
    > - 結果收集和傳遞

    ### 2. 任務分發模式

    ````python
    `python

    # 示例:任务分发

    用户请求:
    "实现一个任务分发器,将任务分发给多个异步子Agent"

    Claude Code 生成的代码:

    ```python

    ```python

    import asyncio
    from typing import Dict, Any, List
    import random
    import logging

    logger = logging.getLogger(__name__)

    class TaskDispatcher:
        """任务分发器"""

        def __init__(self, context):
            self.context = context
            self.agents = []
            self.task_queue = asyncio.Queue()
            self.running = False

        def register_agent(self, agent):
            """注册Agent"""
            self.agents.append(agent)
            logger.info(f"Agent registered, total: {len(self.agents)}")

        async def start(self):
            """启动分发器"""
            self.running = True
            logger.info("TaskDispatcher started")

            # 启动分发线程
            asyncio.create_task(self._dispatch_tasks())

        async def stop(self):
            """停止分发器"""
            self.running = False
            logger.info("TaskDispatcher stopped")

        async def submit_task(self, task: Dict[str, Any]):
            """提交任务"""
            await self.task_queue.put(task)
            logger.info(f"Task submitted: {task.get('id', 'unknown')}")

        async def _dispatch_tasks(self):
            """分发任务"""
            while self.running:
                try:
                    # 获取任务
                    task = await asyncio.wait_for(
                        self.task_queue.get(),
                        timeout=1.0
                    )

                    # 选择Agent
                    agent = self._select_agent(task)

                    if agent:
                        # 提交任务给Agent
                        await agent.submit_task(task['id'], task)
                        logger.info(f"Task {task['id']} dispatched to {agent.__class__.__name__}")
                    else:
                        logger.warning(f"No available agent for task {task['id']}")

                except asyncio.TimeoutError:
                    continue
                except Exception as e:
                    logger.error(f"Error dispatching task: {e}")

        def _select_agent(self, task: Dict[str, Any]) -> Any:
            """选择Agent"""
            if not self.agents:
                return None

            # 根据任务类型选择Agent
            task_type = task.get('type', 'default')

            for agent in self.agents:
                if hasattr(agent, 'can_handle') and agent.can_handle(task_type):
                    return agent

            # 随机选择一个Agent
            return random.choice(self.agents)

    class LoadBalancedDispatcher(TaskDispatcher):
        """负载均衡分发器"""

        def __init__(self, context):
            super().__init__(context)
            self.agent_loads = {}

        def register_agent(self, agent):
            """注册Agent"""
            super().register_agent(agent)
            self.agent_loads[agent] = 0

        async def _dispatch_tasks(self):
            """分发任务(负载均衡)"""
            while self.running:
                try:
                    # 获取任务
                    task = await asyncio.wait_for(
                        self.task_queue.get(),
                        timeout=1.0
                    )

                    # 选择负载最低的Agent
                    agent = self._select_least_loaded_agent(task)

                    if agent:
                        # 提交任务给Agent
                        await agent.submit_task(task['id'], task)
                        self.agent_loads[agent] += 1
                        logger.info(f"Task {task['id']} dispatched to {agent.__class__.__name__} (load: {self.agent_loads[agent]})")
                    else:
                        logger.warning(f"No available agent for task {task['id']}")

                except asyncio.TimeoutError:
                    continue
                except Exception as e:
                    logger.error(f"Error dispatching task: {e}")

        def _select_least_loaded_agent(self, task: Dict[str, Any]) -> Any:
            """选择负载最低的Agent"""
            if not self.agents:
                return None

            # 找到负载最低的Agent
            min_load = float('inf')
            selected_agent = None

            for agent in self.agents:
                if self.agent_loads[agent] < min_load:
                    min_load = self.agent_loads[agent]
                    selected_agent = agent

            return selected_agent

        def update_agent_load(self, agent, delta: int):
            """更新Agent负载"""
            if agent in self.agent_loads:
                self.agent_loads[agent] += delta
                logger.info(f"Agent {agent.__class__.__name__} load updated to {self.agent_loads[agent]}")

    # 使用示例
    async def main():
        """主函数"""
        dispatcher = LoadBalancedDispatcher({})

        # 注册Agent
        agent1 = AsyncCodeGenerationAgent({})
        agent2 = AsyncCodeGenerationAgent({})
        agent3 = AsyncCodeGenerationAgent({})

        await agent1.start()
        await agent2.start()
        await agent3.start()

        dispatcher.register_agent(agent1)
        dispatcher.register_agent(agent2)
        dispatcher.register_agent(agent3)

        # 启动分发器
        await dispatcher.start()

        # 提交多个任务
        for i in range(10):
            await dispatcher.submit_task({
                'id': f'task{i}',
                'type': 'code_generation',
                'name': f'Task {i}',
                'function_name': f'func{i}'
            })

        # 等待任务完成
        await asyncio.sleep(5)

        # 停止分发器
        await dispatcher.stop()

        # 停止Agent
        await agent1.stop()
        await agent2.stop()
        await agent3.stop()

    if __name__ == '__main__':
        asyncio.run(main())

    ```> **任務分發特點**:

    > - 負載均衡
    > - 動態Agent選擇
    > - 負載監控
    > - 任務路由

    ## 非同步任務監控

    ### 1. 任務狀態監控

    ````python
    `python

    # 示例:任务状态监控

    用户请求:
    "实现一个任务状态监控系统"

    Claude Code 生成的代码:

    ```python

    ```python

    import asyncio
    from typing import Dict, Any
    from datetime import datetime
    import logging

    logger = logging.getLogger(__name__)

    class TaskMonitor:
        """任务监控器"""

        def __init__(self):
            self.tasks = {}
            self.events = []

        def register_task(self, task_id: str, task_info: Dict[str, Any]):
            """注册任务"""
            self.tasks[task_id] = {
                'id': task_id,
                'status': 'pending',
                'created_at': datetime.utcnow(),
                'updated_at': datetime.utcnow(),
                **task_info
            }

            self._log_event(task_id, 'registered')
            logger.info(f"Task {task_id} registered")

        def update_task_status(self, task_id: str, status: str, **kwargs):
            """更新任务状态"""
            if task_id not in self.tasks:
                logger.warning(f"Task {task_id} not found")
                return

            self.tasks[task_id]['status'] = status
            self.tasks[task_id]['updated_at'] = datetime.utcnow()
            self.tasks[task_id].update(kwargs)

            self._log_event(task_id, f'status_changed_to_{status}')
            logger.info(f"Task {task_id} status updated to {status}")

        def get_task_status(self, task_id: str) -> Dict[str, Any]:
            """获取任务状态"""
            return self.tasks.get(task_id, {})

        def get_all_tasks(self) -> Dict[str, Any]:
            """获取所有任务"""
            return self.tasks

        def get_tasks_by_status(self, status: str) -> Dict[str, Any]:
            """根据状态获取任务"""
            return {
                task_id: task
                for task_id, task in self.tasks.items()
                if task['status'] == status
            }

        def get_task_statistics(self) -> Dict[str, Any]:
            """获取任务统计"""
            stats = {
                'total': len(self.tasks),
                'pending': 0,
                'processing': 0,
                'completed': 0,
                'failed': 0,
                'timeout': 0
            }

            for task in self.tasks.values():
                status = task['status']
                if status in stats:
                    stats[status] += 1

            return stats

        def _log_event(self, task_id: str, event: str):
            """记录事件"""
            self.events.append({
                'task_id': task_id,
                'event': event,
                'timestamp': datetime.utcnow()
            })

        def get_task_events(self, task_id: str) -> list:
            """获取任务事件"""
            return [
                event for event in self.events
                if event['task_id'] == task_id
            ]

    # 使用示例
    async def main():
        """主函数"""
        monitor = TaskMonitor()

        # 注册任务
        monitor.register_task('task1', {'name': 'Task 1', 'type': 'code_generation'})
        monitor.register_task('task2', {'name': 'Task 2', 'type': 'code_review'})
        monitor.register_task('task3', {'name': 'Task 3', 'type': 'test_generation'})

        # 更新任务状态
        monitor.update_task_status('task1', 'processing')
        await asyncio.sleep(1)
        monitor.update_task_status('task1', 'completed', result='success')

        monitor.update_task_status('task2', 'processing')
        await asyncio.sleep(1)
        monitor.update_task_status('task2', 'failed', error='validation error')

        monitor.update_task_status('task3', 'processing')
        await asyncio.sleep(1)
        monitor.update_task_status('task3', 'timeout')

        # 获取任务状态
        print("Task 1 status:", monitor.get_task_status('task1'))
        print("Task 2 status:", monitor.get_task_status('task2'))
        print("Task 3 status:", monitor.get_task_status('task3'))

        # 获取任务统计
        stats = monitor.get_task_statistics()
        print("\nTask Statistics:")
        for key, value in stats.items():
            print(f"  {key}: {value}")

        # 获取任务事件
        print("\nTask 1 events:")
        for event in monitor.get_task_events('task1'):
            print(f"  {event['timestamp']}: {event['event']}")

    if __name__ == '__main__':
        asyncio.run(main())


    **任務監控特點**:

    - 實時狀態跟蹤
    - 任務統計
    - 事件記錄
    - 狀態查詢


    ```

基于 MIT 许可发布 | 永久导航