28.5 性能优化
28.5.1 性能优化概述
Claude Code 的性能优化涉及多个层面,包括:
- 响应速度 :减少用户请求的响应时间
- 资源利用 :优化 CPU、内存、磁盘等资源的使用
- 并发处理 :提高并发请求的处理能力
- 缓存策略 :有效利用缓存减少重复计算
- 算法优化 :选择和实现高效的算法
python
## 28.5.2 缓存优化
### 1\. 多级缓存架构
class MultiLevelCache: """多级缓存"""
def **init**(self): self.l1_cache: Dict[str, Any] = {} # 内存缓存 self.l2_cache: Dict[str, Any] = {} # 本地缓存 self.l3_cache: Optional[CacheClient] = None # 分布式缓存
self.l1_size = 1000 self.l1_hits = 0 self.l1_misses = 0
self.l2_size = 10000 self.l2_hits = 0 self.l2_misses = 0
def set_l3_cache(self, cache_client: CacheClient): """设置三级缓存""" self.l3_cache = cache_client
def get(self, key: str) -> Optional[Any]: """获取缓存值"""
# L1 缓存
if key in self.l1_cache: self.l1_hits += 1 return self.l1_cache[key] self.l1_misses += 1
# L2 缓存
if key in self.l2_cache: self.l2_hits += 1 value = self.l2_cache[key]
# 提升到 L1
self._promote_to_l1(key, value) return value self.l2_misses += 1
# L3 缓存
if self.l3_cache: value = self.l3_cache.get(key) if value is not None:
# 提升到 L2 和 L1self._promote_to_l2(key, value) self._promote_to_l1(key, value) return value
return None
python
def set(self, key: str, value: Any, ttl: int = 3600): """设置缓存值"""
# L1 缓存
self._set_l1(key, value)
# L2 缓存
self._set_l2(key, value, ttl)
# L3 缓存
if self.l3_cache: self.l3_cache.set(key, value, ttl)
def _set_l1(self, key: str, value: Any): """设置 L1 缓存""" if len(self.l1_cache) >= self.l1_size:
# LRU 淘汰
self._evict_lru(self.l1_cache) self.l1_cache[key] = value
def _set_l2(self, key: str, value: Any, ttl: int): """设置 L2 缓存""" if len(self.l2_cache) >= self.l2_size:
# LRU 淘汰
self._evict_lru(self.l2_cache) self.l2_cache[key] = { 'value': value, 'expires': time.time() + ttl }
def _promote_to_l1(self, key: str, value: Any): """提升到 L1 缓存""" self._set_l1(key, value)
def _promote_to_l2(self, key: str, value: Any): """提升到 L2 缓存""" self._set_l2(key, value, 3600)
def _evict_lru(self, cache: Dict[str, Any]): """淘汰 LRU 条目""" if not cache: return
# 简单实现:删除第一个条目
oldest_key = next(iter(cache)) del cache[oldest_key]
def get_stats(self) -> Dict[str, Any]: """获取缓存统计""" l1_hit_rate = self.l1_hits / (self.l1_hits + self.l1_misses) if (self.l1_hits + self.l1_misses) > 0 else 0 l2_hit_rate = self.l2_hits / (self.l2_hits + self.l2_misses) if (self.l2_hits + self.l2_misses) > 0 else 0
return { 'l1': { 'size': len(self.l1_cache), 'hits': self.l1_hits, 'misses': self.l1_misses, 'hit_rate': l1_hit_rate }, 'l2': { 'size': len(self.l2_cache), 'hits': self.l2_hits, 'misses': self.l2_misses, 'hit_rate': l2_hit_rate } }
### 2\. 智能缓存策略
bash
python
class SmartCache:
"""智能缓存"""
def __init__(self):
self.cache: Dict[str, CacheEntry] = {}
self.access_patterns: Dict[str, AccessPattern] = {}
self.max_size = 10000
def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
entry = self.cache.get(key)
if entry is None:
return None
# 检查是否过期
if entry.is_expired():
del self.cache[key]
return None
# 更新访问模式
self._update_access_pattern(key)
return entry.value
def set(self, key: str, value: Any, ttl: int = None):
"""设置缓存值"""
# 预测 TTL
if ttl is None:
ttl = self._predict_ttl(key)
entry = CacheEntry(
key=key,
value=value,
ttl=ttl,
created_at=time.time()
)
# 检查缓存大小
if len(self.cache) >= self.max_size:
self._evict()
self.cache[key] = entry
def _update_access_pattern(self, key: str):
"""更新访问模式"""
if key not in self.access_patterns:
self.access_patterns[key] = AccessPattern(key)
pattern = self.access_patterns[key]
pattern.record_access()
def _predict_ttl(self, key: str) -> int:
"""预测 TTL"""
pattern = self.access_patterns.get(key)
if pattern is None:
return 3600 # 默认 1 小时
# 基于访问频率预测
frequency = pattern.get_frequency()
if frequency > 10: # 高频访问
return 7200 # 2 小时
elif frequency > 5: # 中频访问
return 3600 # 1 小时
else: # 低频访问
return 1800 # 30 分钟
def _evict(self):
"""淘汰缓存条目"""
# 基于访问模式进行淘汰
scores = {}
for key, pattern in self.access_patterns.items():
score = pattern.get_score()
scores[key] = score
# 淘汰分数最低的条目
if scores:
worst_key = min(scores, key=scores.get)
if worst_key in self.cache:
del self.cache[worst_key]
if worst_key in self.access_patterns:
del self.access_patterns[worst_key]
class CacheEntry:
"""缓存条目"""
def __init__(self, key: str, value: Any, ttl: int, created_at: float):
self.key = key
self.value = value
self.ttl = ttl
self.created_at = created_at
def is_expired(self) -> bool:
"""检查是否过期"""
if self.ttl is None:
return False
return time.time() > self.created_at + self.ttl
class AccessPattern:
"""访问模式"""
def __init__(self, key: str):
self.key = key
self.access_times: List[float] = []
self.max_history = 100
def record_access(self):
"""记录访问"""
self.access_times.append(time.time())
# 限制历史记录大小
if len(self.access_times) > self.max_history:
self.access_times = self.access_times[-self.max_history:]
def get_frequency(self) -> float:
"""获取访问频率"""
if len(self.access_times) < 2:
return 0
time_span = self.access_times[-1] - self.access_times[0]
if time_span == 0:
return len(self.access_times)
return len(self.access_times) / time_span
def get_score(self) -> float:
"""获取分数(用于淘汰决策)"""
frequency = self.get_frequency()
recency = self._get_recency()
# 综合频率和最近性
return frequency * 0.7 + recency * 0.3
def _get_recency(self) -> float:
"""获取最近性"""
if not self.access_times:
return 0
last_access = self.access_times[-1]
time_since_last = time.time() - last_access
# 越近访问,分数越高
return 1.0 / (1.0 + time_since_last)
## 28.5.3 并发优化
### 1. 异步任务处理
class AsyncTaskProcessor:
"""异步任务处理器"""
def __init__(self, max_workers: int = 4):
self.task_queue: asyncio.Queue = asyncio.Queue()
self.max_workers = max_workers
self.workers: List[asyncio.Task] = []
self.running = False
async def start(self):
"""启动处理器"""
self.running = True
# 创建工作协程
for i in range(self.max_workers):
worker = asyncio.create_task(self._worker(f"worker-{i}"))
self.workers.append(worker)
async def stop(self):
"""停止处理器"""
self.running = False
# 等待所有工作协程完成
await asyncio.gather(*self.workers, return_exceptions=True)
async def submit_task(self, task: AsyncTask):
"""提交任务"""
await self.task_queue.put(task)
async def _worker(self, worker_name: str):
"""工作协程"""
logger.info(f"{worker_name} started")
while self.running:
try:
# 获取任务
task = await asyncio.wait_for(
self.task_queue.get(),
timeout=1.0
)
# 执行任务
await self._execute_task(worker_name, task)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"{worker_name} error: {e}")
logger.info(f"{worker_name} stopped")
async def _execute_task(self, worker_name: str, task: AsyncTask):
"""执行任务"""
task.status = "running"
task.started_at = time.time()
try:
result = await task.execute()
task.result = result
task.status = "completed"
except Exception as e:
task.error = str(e)
task.status = "failed"
finally:
task.completed_at = time.time()
task.duration = task.completed_at - task.started_at
### 2\. 任务池管理
bash
python
class TaskPool:
"""任务池"""
def __init__(self, max_size: int = 100):
self.max_size = max_size
self.tasks: Dict[str, AsyncTask] = {}
self.task_queue: asyncio.Queue = asyncio.Queue()
self.completed_tasks: List[str] = []
async def add_task(self, task: AsyncTask) -> str:
"""添加任务"""
task_id = task.id
# 检查任务池大小
if len(self.tasks) >= self.max_size:
# 等待有任务完成
await self._wait_for_completion()
self.tasks[task_id] = task
await self.task_queue.put(task)
return task_id
async def get_task_result(self, task_id: str, timeout: float = None) -> Any:
"""获取任务结果"""
task = self.tasks.get(task_id)
if not task:
raise ValueError(f"Task not found: {task_id}")
# 等待任务完成
if task.status != "completed":
await self._wait_for_task(task, timeout)
if task.status == "failed":
raise Exception(task.error)
return task.result
async def _wait_for_completion(self):
"""等待任务完成"""
while len(self.tasks) >= self.max_size:
# 检查是否有完成的任务
completed = [tid for tid, task in self.tasks.items()
if task.status in ["completed", "failed"]]
if completed:
# 清理完成的任务
for tid in completed:
del self.tasks[tid]
self.completed_tasks.append(tid)
break
await asyncio.sleep(0.1)
async def _wait_for_task(self, task: AsyncTask, timeout: float):
"""等待任务完成"""
start_time = time.time()
while task.status not in ["completed", "failed"]:
if timeout and (time.time() - start_time) > timeout:
raise TimeoutError(f"Task timeout: {task.id}")
await asyncio.sleep(0.1)
## 28.5.4 算法优化
### 1. 字符串匹配优化
class StringMatcher:
"""字符串匹配器"""
def __init__(self):
self.patterns: Dict[str, List[str]] = {}
self.trie: Trie = Trie()
def add_pattern(self, category: str, pattern: str):
"""添加模式"""
if category not in self.patterns:
self.patterns[category] = []
self.patterns[category].append(pattern)
self.trie.insert(pattern)
def match(self, text: str) -> List[MatchResult]:
"""匹配文本"""
results = []
# 使用 Trie 进行快速匹配
matches = self.trie.search(text)
for match in matches:
result = MatchResult(
pattern=match.pattern,
start=match.start,
end=match.end,
text=text[match.start:match.end]
)
results.append(result)
return results
class TrieNode:
"""Trie 节点"""
def __init__(self):
self.children: Dict[str, TrieNode] = {}
self.is_end = False
self.pattern: Optional[str] = None
class Trie:
"""Trie 树"""
def __init__(self):
self.root = TrieNode()
def insert(self, pattern: str):
"""插入模式"""
node = self.root
for char in pattern:
if char not in node.children:
node.children[char] = TrieNode()
node = node.children[char]
node.is_end = True
node.pattern = pattern
def search(self, text: str) -> List[Dict[str, Any]]:
"""搜索文本"""
matches = []
for i in range(len(text)):
node = self.root
j = i
while j < len(text) and text[j] in node.children:
node = node.children[text[j]]
j += 1
if node.is_end:
matches.append({
'pattern': node.pattern,
'start': i,
'end': j
})
return matches
### 2\. 向量搜索优化
bash
python
class VectorSearcher:
"""向量搜索器"""
def __init__(self, dimension: int):
self.dimension = dimension
self.vectors: Dict[str, np.ndarray] = {}
self.index: Optional[faiss.Index] = None
def add_vector(self, vector_id: str, vector: np.ndarray):
"""添加向量"""
if len(vector) != self.dimension:
raise ValueError("Vector dimension mismatch")
self.vectors[vector_id] = vector
def build_index(self):
"""构建索引"""
if not self.vectors:
return
# 创建 FAISS 索引
self.index = faiss.IndexFlatL2(self.dimension)
# 添加向量
vectors = np.array(list(self.vectors.values()))
self.index.add(vectors)
def search(self, query: np.ndarray, k: int = 10) -> List[SearchResult]:
"""搜索相似向量"""
if not self.index:
self.build_index()
# 搜索
distances, indices = self.index.search(
query.reshape(1, -1),
k
)
# 构建结果
results = []
vector_ids = list(self.vectors.keys())
for i, idx in enumerate(indices[0]):
if idx >= 0 and idx < len(vector_ids):
result = SearchResult(
vector_id=vector_ids[idx],
distance=float(distances[0][i]),
similarity=1.0 / (1.0 + float(distances[0][i]))
)
results.append(result)
return results
## 28.5.5 资源优化
### 1. 内存管理
class MemoryManager:
"""内存管理器"""
def __init__(self, max_memory: int = 1024 * 1024 * 1024): # 1GB
self.max_memory = max_memory
self.used_memory = 0
self.allocations: Dict[str, MemoryAllocation] = {}
def allocate(self, allocation_id: str, size: int) -> bool:
"""分配内存"""
if self.used_memory + size > self.max_memory:
# 尝试释放一些内存
if not self._free_memory(size):
return False
allocation = MemoryAllocation(
id=allocation_id,
size=size,
allocated_at=time.time()
)
self.allocations[allocation_id] = allocation
self.used_memory += size
return True
def deallocate(self, allocation_id: str):
"""释放内存"""
allocation = self.allocations.get(allocation_id)
if allocation:
self.used_memory -= allocation.size
del self.allocations[allocation_id]
def _free_memory(self, required_size: int) -> bool:
"""释放内存"""
# 按使用时间排序
sorted_allocations = sorted(
self.allocations.values(),
key=lambda x: x.allocated_at
)
freed = 0
for allocation in sorted_allocations:
self.deallocate(allocation.id)
freed += allocation.size
if freed >= required_size:
return True
return False
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
'max_memory': self.max_memory,
'used_memory': self.used_memory,
'free_memory': self.max_memory - self.used_memory,
'utilization': self.used_memory / self.max_memory,
'num_allocations': len(self.allocations)
}
### 2\. 连接池管理
bash
python
class ConnectionPool:
"""连接池"""
def __init__(self, factory: callable, max_size: int = 10):
self.factory = factory
self.max_size = max_size
self.pool: List[Any] = []
self.in_use: Set[Any] = set()
def acquire(self, timeout: float = 30.0) -> Any:
"""获取连接"""
start_time = time.time()
while True:
# 检查是否有可用连接
if self.pool:
connection = self.pool.pop()
self.in_use.add(connection)
return connection
# 检查是否可以创建新连接
if len(self.in_use) < self.max_size:
connection = self.factory()
self.in_use.add(connection)
return connection
# 等待连接释放
if time.time() - start_time > timeout:
raise TimeoutError("Failed to acquire connection")
time.sleep(0.1)
def release(self, connection: Any):
"""释放连接"""
if connection in self.in_use:
self.in_use.remove(connection)
self.pool.append(connection)
def close_all(self):
"""关闭所有连接"""
for connection in self.pool:
try:
connection.close()
except Exception as e:
logger.error(f"Error closing connection: {e}")
for connection in self.in_use:
try:
connection.close()
except Exception as e:
logger.error(f"Error closing connection: {e}")
self.pool.clear()
self.in_use.clear()
## 28.5.6 性能监控
### 1. 性能指标收集
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.metrics: Dict[str, List[float]] = {}
self.counters: Dict[str, int] = {}
self.timers: Dict[str, float] = {}
def record_metric(self, name: str, value: float):
"""记录指标"""
if name not in self.metrics:
self.metrics[name] = []
self.metrics[name].append(value)
def increment_counter(self, name: str, value: int = 1):
"""增加计数器"""
self.counters[name] = self.counters.get(name, 0) + value
def start_timer(self, name: str):
"""启动计时器"""
self.timers[name] = time.time()
def stop_timer(self, name: str) -> float:
"""停止计时器"""
if name not in self.timers:
raise ValueError(f"Timer not found: {name}")
duration = time.time() - self.timers[name]
del self.timers[name]
self.record_metric(f"{name}_duration", duration)
return duration
def get_summary(self) -> Dict[str, Any]:
"""获取摘要"""
summary = {
'metrics': {},
'counters': self.counters.copy()
}
for name, values in self.metrics.items():
summary['metrics'][name] = {
'count': len(values),
'mean': sum(values) / len(values),
'min': min(values),
'max': max(values),
'p50': np.percentile(values, 50),
'p95': np.percentile(values, 95),
'p99': np.percentile(values, 99)
}
return summary
### 2\. 性能分析器
bash
python
class PerformanceProfiler:
"""性能分析器"""
def __init__(self):
self.call_stack: List[ProfileEntry] = []
self.entries: List[ProfileEntry] = []
def enter(self, function_name: str):
"""进入函数"""
entry = ProfileEntry(
function_name=function_name,
start_time=time.time(),
parent=self.call_stack[-1] if self.call_stack else None
)
self.call_stack.append(entry)
self.entries.append(entry)
def exit(self):
"""退出函数"""
if not self.call_stack:
return
entry = self.call_stack.pop()
entry.end_time = time.time()
entry.duration = entry.end_time - entry.start_time
def get_report(self) -> str:
"""获取报告"""
# 按函数名分组
function_stats: Dict[str, Dict[str, Any]] = {}
for entry in self.entries:
if entry.function_name not in function_stats:
function_stats[entry.function_name] = {
'count': 0,
'total_duration': 0.0,
'min_duration': float('inf'),
'max_duration': 0.0
}
stats = function_stats[entry.function_name]
stats['count'] += 1
stats['total_duration'] += entry.duration
stats['min_duration'] = min(stats['min_duration'], entry.duration)
stats['max_duration'] = max(stats['max_duration'], entry.duration)
# 生成报告
report = []
report.append("Performance Profile Report")
report.append("=" * 50)
report.append("")
for function_name, stats in sorted(
function_stats.items(),
key=lambda x: x[1]['total_duration'],
reverse=True
):
report.append(f"Function: {function_name}")
report.append(f" Calls: {stats['count']}")
report.append(f" Total Time: {stats['total_duration']:.4f}s")
report.append(f" Avg Time: {stats['total_duration'] / stats['count']:.4f}s")
report.append(f" Min Time: {stats['min_duration']:.4f}s")
report.append(f" Max Time: {stats['max_duration']:.4f}s")
report.append("")
return "\n".join(report)
class ProfileEntry:
"""性能分析条目"""
def __init__(self, function_name: str, start_time: float,
parent: Optional['ProfileEntry']):
self.function_name = function_name
self.start_time = start_time
self.end_time = None
self.duration = None
self.parent = parent
## 28.5.7 性能优化最佳实践
### 1\. 优化原则- 测量优先 :先测量再优化,避免过早优化
- 热点优化 :专注于性能瓶颈和热点代码
- 渐进优化 :逐步优化,每次优化后验证效果
- 权衡考虑 :在性能、可读性、可维护性之间权衡
- 持续监控 :建立持续的性能监控机制
2. 优化策略
- 缓存优先 :有效利用缓存减少重复计算
- 异步处理 :使用异步处理提高并发能力
- 算法优化 :选择合适的算法和数据结构
- 资源管理 :合理管理内存、连接等资源
- 批量处理 :批量处理减少开销
3. 监控建议
- 关键指标 :监控响应时间、吞吐量、资源使用率
- 告警机制 :设置合理的告警阈值
- 趋势分析 :分析性能趋势,提前发现问题
- 定期审查 :定期审查性能指标和优化效果
- 文档记录 :记录优化过程和经验教训
通过系统的性能优化,Claude Code 可以提供更快速、更高效的编程辅助体验。