Skip to content

28.5 性能优化

28.5.1 性能优化概述

Claude Code 的性能优化涉及多个层面,包括:

  1. 响应速度 :减少用户请求的响应时间
  2. 资源利用 :优化 CPU、内存、磁盘等资源的使用
  3. 并发处理 :提高并发请求的处理能力
  4. 缓存策略 :有效利用缓存减少重复计算
  5. 算法优化 :选择和实现高效的算法
python
## 28.5.2 缓存优化

### 1\. 多级缓存架构

class MultiLevelCache: """多级缓存"""

def **init**(self): self.l1_cache: Dict[str, Any] = {} # 内存缓存 self.l2_cache: Dict[str, Any] = {} # 本地缓存 self.l3_cache: Optional[CacheClient] = None # 分布式缓存

self.l1_size = 1000 self.l1_hits = 0 self.l1_misses = 0

self.l2_size = 10000 self.l2_hits = 0 self.l2_misses = 0

def set_l3_cache(self, cache_client: CacheClient): """设置三级缓存""" self.l3_cache = cache_client

def get(self, key: str) -> Optional[Any]: """获取缓存值"""

# L1 缓存

if key in self.l1_cache: self.l1_hits += 1 return self.l1_cache[key] self.l1_misses += 1

# L2 缓存

if key in self.l2_cache: self.l2_hits += 1 value = self.l2_cache[key]

# 提升到 L1

self._promote_to_l1(key, value) return value self.l2_misses += 1

# L3 缓存

if self.l3_cache: value = self.l3_cache.get(key) if value is not None:

# 提升到 L2 和 L1

self._promote_to_l2(key, value) self._promote_to_l1(key, value) return value

return None

python
def set(self, key: str, value: Any, ttl: int = 3600): """设置缓存值"""

# L1 缓存

self._set_l1(key, value)

# L2 缓存

self._set_l2(key, value, ttl)

# L3 缓存

if self.l3_cache: self.l3_cache.set(key, value, ttl)

def _set_l1(self, key: str, value: Any): """设置 L1 缓存""" if len(self.l1_cache) >= self.l1_size:

# LRU 淘汰

self._evict_lru(self.l1_cache) self.l1_cache[key] = value

def _set_l2(self, key: str, value: Any, ttl: int): """设置 L2 缓存""" if len(self.l2_cache) >= self.l2_size:

# LRU 淘汰

self._evict_lru(self.l2_cache) self.l2_cache[key] = { 'value': value, 'expires': time.time() + ttl }

def _promote_to_l1(self, key: str, value: Any): """提升到 L1 缓存""" self._set_l1(key, value)

def _promote_to_l2(self, key: str, value: Any): """提升到 L2 缓存""" self._set_l2(key, value, 3600)

def _evict_lru(self, cache: Dict[str, Any]): """淘汰 LRU 条目""" if not cache: return

# 简单实现:删除第一个条目

oldest_key = next(iter(cache)) del cache[oldest_key]

def get_stats(self) -> Dict[str, Any]: """获取缓存统计""" l1_hit_rate = self.l1_hits / (self.l1_hits + self.l1_misses) if (self.l1_hits + self.l1_misses) > 0 else 0 l2_hit_rate = self.l2_hits / (self.l2_hits + self.l2_misses) if (self.l2_hits + self.l2_misses) > 0 else 0

return { 'l1': { 'size': len(self.l1_cache), 'hits': self.l1_hits, 'misses': self.l1_misses, 'hit_rate': l1_hit_rate }, 'l2': { 'size': len(self.l2_cache), 'hits': self.l2_hits, 'misses': self.l2_misses, 'hit_rate': l2_hit_rate } }

### 2\. 智能缓存策略

    bash


    python

    class SmartCache:
        """智能缓存"""

        def __init__(self):
            self.cache: Dict[str, CacheEntry] = {}
            self.access_patterns: Dict[str, AccessPattern] = {}
            self.max_size = 10000

        def get(self, key: str) -> Optional[Any]:
            """获取缓存值"""
            entry = self.cache.get(key)

            if entry is None:
                return None

            # 检查是否过期
            if entry.is_expired():
                del self.cache[key]
                return None

            # 更新访问模式
            self._update_access_pattern(key)

            return entry.value

        def set(self, key: str, value: Any, ttl: int = None):
            """设置缓存值"""
            # 预测 TTL
            if ttl is None:
                ttl = self._predict_ttl(key)

            entry = CacheEntry(
                key=key,
                value=value,
                ttl=ttl,
                created_at=time.time()
            )

            # 检查缓存大小
            if len(self.cache) >= self.max_size:
                self._evict()

            self.cache[key] = entry

        def _update_access_pattern(self, key: str):
            """更新访问模式"""
            if key not in self.access_patterns:
                self.access_patterns[key] = AccessPattern(key)

            pattern = self.access_patterns[key]
            pattern.record_access()

        def _predict_ttl(self, key: str) -> int:
            """预测 TTL"""
            pattern = self.access_patterns.get(key)

            if pattern is None:
                return 3600  # 默认 1 小时

            # 基于访问频率预测
            frequency = pattern.get_frequency()

            if frequency > 10:  # 高频访问
                return 7200  # 2 小时
            elif frequency > 5:  # 中频访问
                return 3600  # 1 小时
            else:  # 低频访问
                return 1800  # 30 分钟

        def _evict(self):
            """淘汰缓存条目"""
            # 基于访问模式进行淘汰
            scores = {}
            for key, pattern in self.access_patterns.items():
                score = pattern.get_score()
                scores[key] = score

            # 淘汰分数最低的条目
            if scores:
                worst_key = min(scores, key=scores.get)
                if worst_key in self.cache:
                    del self.cache[worst_key]
                if worst_key in self.access_patterns:
                    del self.access_patterns[worst_key]

    class CacheEntry:
        """缓存条目"""

        def __init__(self, key: str, value: Any, ttl: int, created_at: float):
            self.key = key
            self.value = value
            self.ttl = ttl
            self.created_at = created_at

        def is_expired(self) -> bool:
            """检查是否过期"""
            if self.ttl is None:
                return False
            return time.time() > self.created_at + self.ttl

    class AccessPattern:
        """访问模式"""

        def __init__(self, key: str):
            self.key = key
            self.access_times: List[float] = []
            self.max_history = 100

        def record_access(self):
            """记录访问"""
            self.access_times.append(time.time())

            # 限制历史记录大小
            if len(self.access_times) > self.max_history:
                self.access_times = self.access_times[-self.max_history:]

        def get_frequency(self) -> float:
            """获取访问频率"""
            if len(self.access_times) < 2:
                return 0

            time_span = self.access_times[-1] - self.access_times[0]
            if time_span == 0:
                return len(self.access_times)

            return len(self.access_times) / time_span

        def get_score(self) -> float:
            """获取分数(用于淘汰决策)"""
            frequency = self.get_frequency()
            recency = self._get_recency()

            # 综合频率和最近性
            return frequency * 0.7 + recency * 0.3

        def _get_recency(self) -> float:
            """获取最近性"""
            if not self.access_times:
                return 0

            last_access = self.access_times[-1]
            time_since_last = time.time() - last_access

            # 越近访问,分数越高
            return 1.0 / (1.0 + time_since_last)

    ## 28.5.3 并发优化

    ### 1. 异步任务处理

    class AsyncTaskProcessor:
    """异步任务处理器"""
    def __init__(self, max_workers: int = 4):
    self.task_queue: asyncio.Queue = asyncio.Queue()
    self.max_workers = max_workers
    self.workers: List[asyncio.Task] = []
    self.running = False
    async def start(self):
    """启动处理器"""
    self.running = True
    # 创建工作协程
    for i in range(self.max_workers):
    worker = asyncio.create_task(self._worker(f"worker-{i}"))
    self.workers.append(worker)
    async def stop(self):
    """停止处理器"""
    self.running = False
    # 等待所有工作协程完成
    await asyncio.gather(*self.workers, return_exceptions=True)
    async def submit_task(self, task: AsyncTask):
    """提交任务"""
    await self.task_queue.put(task)
    async def _worker(self, worker_name: str):
    """工作协程"""
    logger.info(f"{worker_name} started")
    while self.running:
    try:
    # 获取任务
    task = await asyncio.wait_for(
    self.task_queue.get(),
    timeout=1.0
    )
    # 执行任务
    await self._execute_task(worker_name, task)
    except asyncio.TimeoutError:
    continue
    except Exception as e:
    logger.error(f"{worker_name} error: {e}")
    logger.info(f"{worker_name} stopped")
    async def _execute_task(self, worker_name: str, task: AsyncTask):
    """执行任务"""
    task.status = "running"
    task.started_at = time.time()
    try:
    result = await task.execute()
    task.result = result
    task.status = "completed"
    except Exception as e:
    task.error = str(e)
    task.status = "failed"
    finally:
    task.completed_at = time.time()
    task.duration = task.completed_at - task.started_at

### 2\. 任务池管理

    bash


    python

    class TaskPool:
        """任务池"""

        def __init__(self, max_size: int = 100):
            self.max_size = max_size
            self.tasks: Dict[str, AsyncTask] = {}
            self.task_queue: asyncio.Queue = asyncio.Queue()
            self.completed_tasks: List[str] = []

        async def add_task(self, task: AsyncTask) -> str:
            """添加任务"""
            task_id = task.id

            # 检查任务池大小
            if len(self.tasks) >= self.max_size:
                # 等待有任务完成
                await self._wait_for_completion()

            self.tasks[task_id] = task
            await self.task_queue.put(task)

            return task_id

        async def get_task_result(self, task_id: str, timeout: float = None) -> Any:
            """获取任务结果"""
            task = self.tasks.get(task_id)

            if not task:
                raise ValueError(f"Task not found: {task_id}")

            # 等待任务完成
            if task.status != "completed":
                await self._wait_for_task(task, timeout)

            if task.status == "failed":
                raise Exception(task.error)

            return task.result

        async def _wait_for_completion(self):
            """等待任务完成"""
            while len(self.tasks) >= self.max_size:
                # 检查是否有完成的任务
                completed = [tid for tid, task in self.tasks.items()
                            if task.status in ["completed", "failed"]]

                if completed:
                    # 清理完成的任务
                    for tid in completed:
                        del self.tasks[tid]
                        self.completed_tasks.append(tid)
                    break

                await asyncio.sleep(0.1)

        async def _wait_for_task(self, task: AsyncTask, timeout: float):
            """等待任务完成"""
            start_time = time.time()

            while task.status not in ["completed", "failed"]:
                if timeout and (time.time() - start_time) > timeout:
                    raise TimeoutError(f"Task timeout: {task.id}")

                await asyncio.sleep(0.1)

    ## 28.5.4 算法优化

    ### 1. 字符串匹配优化

    class StringMatcher:
    """字符串匹配器"""
    def __init__(self):
    self.patterns: Dict[str, List[str]] = {}
    self.trie: Trie = Trie()
    def add_pattern(self, category: str, pattern: str):
    """添加模式"""
    if category not in self.patterns:
    self.patterns[category] = []
    self.patterns[category].append(pattern)
    self.trie.insert(pattern)
    def match(self, text: str) -> List[MatchResult]:
    """匹配文本"""
    results = []
    # 使用 Trie 进行快速匹配
    matches = self.trie.search(text)
    for match in matches:
    result = MatchResult(
    pattern=match.pattern,
    start=match.start,
    end=match.end,
    text=text[match.start:match.end]
    )
    results.append(result)
    return results
    class TrieNode:
    """Trie 节点"""
    def __init__(self):
    self.children: Dict[str, TrieNode] = {}
    self.is_end = False
    self.pattern: Optional[str] = None
    class Trie:
    """Trie 树"""
    def __init__(self):
    self.root = TrieNode()
    def insert(self, pattern: str):
    """插入模式"""
    node = self.root
    for char in pattern:
    if char not in node.children:
    node.children[char] = TrieNode()
    node = node.children[char]
    node.is_end = True
    node.pattern = pattern
    def search(self, text: str) -> List[Dict[str, Any]]:
    """搜索文本"""
    matches = []
    for i in range(len(text)):
    node = self.root
    j = i
    while j < len(text) and text[j] in node.children:
    node = node.children[text[j]]
    j += 1
    if node.is_end:
    matches.append({
    'pattern': node.pattern,
    'start': i,
    'end': j
    })
    return matches

### 2\. 向量搜索优化

    bash


    python

    class VectorSearcher:
        """向量搜索器"""

        def __init__(self, dimension: int):
            self.dimension = dimension
            self.vectors: Dict[str, np.ndarray] = {}
            self.index: Optional[faiss.Index] = None

        def add_vector(self, vector_id: str, vector: np.ndarray):
            """添加向量"""
            if len(vector) != self.dimension:
                raise ValueError("Vector dimension mismatch")

            self.vectors[vector_id] = vector

        def build_index(self):
            """构建索引"""
            if not self.vectors:
                return

            # 创建 FAISS 索引
            self.index = faiss.IndexFlatL2(self.dimension)

            # 添加向量
            vectors = np.array(list(self.vectors.values()))
            self.index.add(vectors)

        def search(self, query: np.ndarray, k: int = 10) -> List[SearchResult]:
            """搜索相似向量"""
            if not self.index:
                self.build_index()

            # 搜索
            distances, indices = self.index.search(
                query.reshape(1, -1),
                k
            )

            # 构建结果
            results = []
            vector_ids = list(self.vectors.keys())

            for i, idx in enumerate(indices[0]):
                if idx >= 0 and idx < len(vector_ids):
                    result = SearchResult(
                        vector_id=vector_ids[idx],
                        distance=float(distances[0][i]),
                        similarity=1.0 / (1.0 + float(distances[0][i]))
                    )
                    results.append(result)

            return results

    ## 28.5.5 资源优化

    ### 1. 内存管理

    class MemoryManager:
    """内存管理器"""
    def __init__(self, max_memory: int = 1024 * 1024 * 1024):  # 1GB
    self.max_memory = max_memory
    self.used_memory = 0
    self.allocations: Dict[str, MemoryAllocation] = {}
    def allocate(self, allocation_id: str, size: int) -> bool:
    """分配内存"""
    if self.used_memory + size > self.max_memory:
    # 尝试释放一些内存
    if not self._free_memory(size):
    return False
    allocation = MemoryAllocation(
    id=allocation_id,
    size=size,
    allocated_at=time.time()
    )
    self.allocations[allocation_id] = allocation
    self.used_memory += size
    return True
    def deallocate(self, allocation_id: str):
    """释放内存"""
    allocation = self.allocations.get(allocation_id)
    if allocation:
    self.used_memory -= allocation.size
    del self.allocations[allocation_id]
    def _free_memory(self, required_size: int) -> bool:
    """释放内存"""
    # 按使用时间排序
    sorted_allocations = sorted(
    self.allocations.values(),
    key=lambda x: x.allocated_at
    )
    freed = 0
    for allocation in sorted_allocations:
    self.deallocate(allocation.id)
    freed += allocation.size
    if freed >= required_size:
    return True
    return False
    def get_stats(self) -> Dict[str, Any]:
    """获取统计信息"""
    return {
    'max_memory': self.max_memory,
    'used_memory': self.used_memory,
    'free_memory': self.max_memory - self.used_memory,
    'utilization': self.used_memory / self.max_memory,
    'num_allocations': len(self.allocations)
    }

### 2\. 连接池管理

    bash


    python

    class ConnectionPool:
        """连接池"""

        def __init__(self, factory: callable, max_size: int = 10):
            self.factory = factory
            self.max_size = max_size
            self.pool: List[Any] = []
            self.in_use: Set[Any] = set()

        def acquire(self, timeout: float = 30.0) -> Any:
            """获取连接"""
            start_time = time.time()

            while True:
                # 检查是否有可用连接
                if self.pool:
                    connection = self.pool.pop()
                    self.in_use.add(connection)
                    return connection

                # 检查是否可以创建新连接
                if len(self.in_use) < self.max_size:
                    connection = self.factory()
                    self.in_use.add(connection)
                    return connection

                # 等待连接释放
                if time.time() - start_time > timeout:
                    raise TimeoutError("Failed to acquire connection")

                time.sleep(0.1)

        def release(self, connection: Any):
            """释放连接"""
            if connection in self.in_use:
                self.in_use.remove(connection)
                self.pool.append(connection)

        def close_all(self):
            """关闭所有连接"""
            for connection in self.pool:
                try:
                    connection.close()
                except Exception as e:
                    logger.error(f"Error closing connection: {e}")

            for connection in self.in_use:
                try:
                    connection.close()
                except Exception as e:
                    logger.error(f"Error closing connection: {e}")

            self.pool.clear()
            self.in_use.clear()

    ## 28.5.6 性能监控

    ### 1. 性能指标收集

    class PerformanceMonitor:
    """性能监控器"""
    def __init__(self):
    self.metrics: Dict[str, List[float]] = {}
    self.counters: Dict[str, int] = {}
    self.timers: Dict[str, float] = {}
    def record_metric(self, name: str, value: float):
    """记录指标"""
    if name not in self.metrics:
    self.metrics[name] = []
    self.metrics[name].append(value)
    def increment_counter(self, name: str, value: int = 1):
    """增加计数器"""
    self.counters[name] = self.counters.get(name, 0) + value
    def start_timer(self, name: str):
    """启动计时器"""
    self.timers[name] = time.time()
    def stop_timer(self, name: str) -> float:
    """停止计时器"""
    if name not in self.timers:
    raise ValueError(f"Timer not found: {name}")
    duration = time.time() - self.timers[name]
    del self.timers[name]
    self.record_metric(f"{name}_duration", duration)
    return duration
    def get_summary(self) -> Dict[str, Any]:
    """获取摘要"""
    summary = {
    'metrics': {},
    'counters': self.counters.copy()
    }
    for name, values in self.metrics.items():
    summary['metrics'][name] = {
    'count': len(values),
    'mean': sum(values) / len(values),
    'min': min(values),
    'max': max(values),
    'p50': np.percentile(values, 50),
    'p95': np.percentile(values, 95),
    'p99': np.percentile(values, 99)
    }
    return summary

### 2\. 性能分析器

    bash


    python

    class PerformanceProfiler:
        """性能分析器"""

        def __init__(self):
            self.call_stack: List[ProfileEntry] = []
            self.entries: List[ProfileEntry] = []

        def enter(self, function_name: str):
            """进入函数"""
            entry = ProfileEntry(
                function_name=function_name,
                start_time=time.time(),
                parent=self.call_stack[-1] if self.call_stack else None
            )

            self.call_stack.append(entry)
            self.entries.append(entry)

        def exit(self):
            """退出函数"""
            if not self.call_stack:
                return

            entry = self.call_stack.pop()
            entry.end_time = time.time()
            entry.duration = entry.end_time - entry.start_time

        def get_report(self) -> str:
            """获取报告"""
            # 按函数名分组
            function_stats: Dict[str, Dict[str, Any]] = {}

            for entry in self.entries:
                if entry.function_name not in function_stats:
                    function_stats[entry.function_name] = {
                        'count': 0,
                        'total_duration': 0.0,
                        'min_duration': float('inf'),
                        'max_duration': 0.0
                    }

                stats = function_stats[entry.function_name]
                stats['count'] += 1
                stats['total_duration'] += entry.duration
                stats['min_duration'] = min(stats['min_duration'], entry.duration)
                stats['max_duration'] = max(stats['max_duration'], entry.duration)

            # 生成报告
            report = []
            report.append("Performance Profile Report")
            report.append("=" * 50)
            report.append("")

            for function_name, stats in sorted(
                function_stats.items(),
                key=lambda x: x[1]['total_duration'],
                reverse=True
            ):
                report.append(f"Function: {function_name}")
                report.append(f"  Calls: {stats['count']}")
                report.append(f"  Total Time: {stats['total_duration']:.4f}s")
                report.append(f"  Avg Time: {stats['total_duration'] / stats['count']:.4f}s")
                report.append(f"  Min Time: {stats['min_duration']:.4f}s")
                report.append(f"  Max Time: {stats['max_duration']:.4f}s")
                report.append("")

            return "\n".join(report)

    class ProfileEntry:
        """性能分析条目"""

        def __init__(self, function_name: str, start_time: float,
                     parent: Optional['ProfileEntry']):
            self.function_name = function_name
            self.start_time = start_time
            self.end_time = None
            self.duration = None
            self.parent = parent

## 28.5.7 性能优化最佳实践

### 1\. 优化原则
  1. 测量优先 :先测量再优化,避免过早优化
  2. 热点优化 :专注于性能瓶颈和热点代码
  3. 渐进优化 :逐步优化,每次优化后验证效果
  4. 权衡考虑 :在性能、可读性、可维护性之间权衡
  5. 持续监控 :建立持续的性能监控机制

2. 优化策略

  1. 缓存优先 :有效利用缓存减少重复计算
  2. 异步处理 :使用异步处理提高并发能力
  3. 算法优化 :选择合适的算法和数据结构
  4. 资源管理 :合理管理内存、连接等资源
  5. 批量处理 :批量处理减少开销

3. 监控建议

  1. 关键指标 :监控响应时间、吞吐量、资源使用率
  2. 告警机制 :设置合理的告警阈值
  3. 趋势分析 :分析性能趋势,提前发现问题
  4. 定期审查 :定期审查性能指标和优化效果
  5. 文档记录 :记录优化过程和经验教训

通过系统的性能优化,Claude Code 可以提供更快速、更高效的编程辅助体验。

基于 MIT 许可发布 | 永久导航