Skip to content

15.4 Skills 效能最佳化

效能最佳化概述

Skills 的效能最佳化是提高 Claude Code 整體效率的關鍵。本節將深入探討 Skills 的各種效能最佳化策略和技巧。

优化策略

1. 缓存优化

1.1 多级缓存

多级缓存架构

缓存层次

  1. L1 快取(記憶體)
  • 最快訪問速度
  • 最小容量
  • 儲存熱點資料
  1. L2 快取(本地磁碟)
  • 中等訪問速度
  • 中等容量
  • 儲存溫資料
  1. L3 快取(遠端儲存)
  • 較慢訪問速度
  • 最大容量
  • 儲存冷資料

快取策略

  • LRU(最近最少使用)
  • LFU(最不經常使用)
  • TTL(生存時間)
  • 手動失效
python
#### 1.2 缓存实现

    bash


    python

    class MultiLevelCache:
        def __init__(self):
            self.l1_cache = LRUCache(maxsize=100)
            self.l2_cache = DiskCache(cache_dir="/tmp/skills_cache")
            self.l3_cache = RedisCache(host="localhost", port=6379)

        async def get(self, key):
            # L1 缓存
            value = self.l1_cache.get(key)
            if value is not None:
                return value

            # L2 缓存
            value = await self.l2_cache.get(key)
            if value is not None:
                self.l1_cache[key] = value
                return value

            # L3 缓存
            value = await self.l3_cache.get(key)
            if value is not None:
                self.l1_cache[key] = value
                await self.l2_cache.set(key, value)
                return value

            return None

        async def set(self, key, value, ttl=3600):
            # 设置所有级别的缓存
            self.l1_cache[key] = value
            await self.l2_cache.set(key, value, ttl)
            await self.l3_cache.set(key, value, ttl)

        async def invalidate(self, key):
            # 使所有级别的缓存失效
            if key in self.l1_cache:
                del self.l1_cache[key]
            await self.l2_cache.delete(key)
            await self.l3_cache.delete(key)

    #### 1.3 缓存键设计

    class CacheKeyGenerator:
    @staticmethod
    def generate(skill_name, parameters, context_version):
    key_parts = [
    skill_name,
    hashlib.md5(json.dumps(parameters, sort_keys=True).encode()).hexdigest(),
    context_version
    ]
    return ":".join(key_parts)
    @staticmethod
    def generate_context_key(context):
    key_parts = [
    "context",
    hashlib.md5(json.dumps(context, sort_keys=True).encode()).hexdigest()
    ]
    return ":".join(key_parts)

### 2\. 并行执行

#### 2.1 任务并行

    bash


    python

    class ParallelSkillExecutor:
        def __init__(self, max_workers=4):
            self.max_workers = max_workers
            self.executor = ThreadPoolExecutor(max_workers=max_workers)

        async def execute_parallel(self, tasks):
            futures = []
            for task in tasks:
                future = self.executor.submit(task.execute)
                futures.append(future)

            results = []
            for future in as_completed(futures):
                result = await asyncio.wrap_future(future)
                results.append(result)

            return results

        async def execute_parallel_with_order(self, tasks):
            loop = asyncio.get_event_loop()
            futures = [loop.run_in_executor(self.executor, task.execute) for task in tasks]
            results = await asyncio.gather(*futures)
            return results

    #### 2.2 流水线执行

    class PipelineExecutor:
    def __init__(self, stages):
    self.stages = stages
    self.queues = [asyncio.Queue() for _ in range(len(stages) + 1)]
    async def add_task(self, task):
    await self.queues[0].put(task)
    async def worker(self, stage_index):
    while True:
    task = await self.queues[stage_index].get()
    result = await self.stages[stage_index].process(task)
    await self.queues[stage_index + 1].put(result)
    async def run(self):
    workers = []
    for i in range(len(self.stages)):
    worker = asyncio.create_task(self.worker(i))
    workers.append(worker)
    await asyncio.gather(*workers)

### 3\. 增量处理

#### 3.1 变更检测

    bash


    python

    class ChangeDetector:
        def __init__(self):
            self.file_hashes = {}
            self.context_hashes = {}

        def detect_file_changes(self, files):
            changes = []
            for file in files:
                current_hash = self.calculate_file_hash(file)
                previous_hash = self.file_hashes.get(file)

                if previous_hash is None:
                    changes.append({"type": "added", "file": file})
                elif current_hash != previous_hash:
                    changes.append({"type": "modified", "file": file})

                self.file_hashes[file] = current_hash

            return changes

        def calculate_file_hash(self, file_path):
            with open(file_path, 'rb') as f:
                return hashlib.md5(f.read()).hexdigest()

    #### 3.2 增量更新

    class IncrementalUpdater:
    def __init__(self):
    self.previous_context = None
    self.previous_results = None
    async def update(self, new_context, changes):
    if self.previous_context is None:
    return await self.full_process(new_context)
    # 只处理变更的部分
    updated_results = copy.deepcopy(self.previous_results)
    for change in changes:
    if change["type"] == "modified":
    result = await self.process_change(change)
    updated_results[change["file"]] = result
    elif change["type"] == "added":
    result = await self.process_change(change)
    updated_results[change["file"]] = result
    elif change["type"] == "deleted":
    del updated_results[change["file"]]
    self.previous_context = new_context
    self.previous_results = updated_results
    return updated_results

### 4\. 资源优化

#### 4.1 内存管理

    bash


    python

    class MemoryManager:
        def __init__(self, max_memory_mb=1024):
            self.max_memory = max_memory_mb * 1024 * 1024
            self.current_memory = 0
            self.allocations = {}

        def allocate(self, key, size):
            if self.current_memory + size > self.max_memory:
                self.free_memory(size)

            self.allocations[key] = size
            self.current_memory += size

        def free_memory(self, required_size):
            freed = 0
            for key in list(self.allocations.keys()):
                if freed >= required_size:
                    break
                freed += self.allocations[key]
                del self.allocations[key]

            self.current_memory -= freed

        def get_memory_usage(self):
            return self.current_memory / (1024 * 1024)

    #### 4.2 连接池

    class ConnectionPool:
    def __init__(self, max_connections=10):
    self.max_connections = max_connections
    self.connections = []
    self.available = asyncio.Queue()
    self.lock = asyncio.Lock()
    async def acquire(self):
    if not self.available.empty():
    return await self.available.get()
    async with self.lock:
    if len(self.connections) < self.max_connections:
    conn = await self.create_connection()
    self.connections.append(conn)
    return conn
    return await self.available.get()
    async def release(self, connection):
    await self.available.put(connection)
    async def create_connection(self):
    return await self.connect_to_service()

### 5\. 算法优化

#### 5.1 复杂度分析

    bash


    markdown

    ## 常见算法复杂度

    ### O(1) - 常数时间
    - 数组索引访问
    - 哈希表查找
    - 栈操作

    ### O(log n) - 对数时间
    - 二分查找
    - 平衡树操作
    - 堆操作

    ### O(n) - 线性时间
    - 数组遍历
    - 链表遍历
    - 简单排序

    ### O(n log n) - 线性对数时间
    - 快速排序
    - 归并排序
    - 堆排序

    ### O(n²) - 平方时间
    - 冒泡排序
    - 嵌套循环
    - 矩阵乘法

    #### 5.2 优化示例

    class OptimizedCodeAnalyzer:
    def __init__(self):
    self.symbol_index = {}
    self.dependency_graph = {}
    def analyze(self, code):
    # 使用索引加速查找
    symbols = self.extract_symbols(code)
    for symbol in symbols:
    if symbol not in self.symbol_index:
    self.symbol_index[symbol] = []
    self.symbol_index[symbol].append(code)
    # 使用图表示依赖关系
    dependencies = self.extract_dependencies(code)
    for dep in dependencies:
    if dep not in self.dependency_graph:
    self.dependency_graph[dep] = []
    self.dependency_graph[dep].append(code)
    def find_usages(self, symbol):
    # O(1) 查找
    return self.symbol_index.get(symbol, [])
    def find_dependents(self, code):
    # O(1) 查找
    return self.dependency_graph.get(code, [])

### 6\. 数据结构优化

#### 6.1 数据结构选择

    bash


    markdown

    ## 数据结构选择指南

    ### 数组
    - 适合:随机访问、固定大小
    - 不适合:频繁插入/删除

    ### 链表
    - 适合:频繁插入/删除
    - 不适合:随机访问

    ### 哈希表
    - 适合:快速查找、键值对
    - 不适合:有序遍历

    ### 树
    - 适合:有序数据、层次结构
    - 不适合:简单数据

    ### 图
    - 适合:关系数据、网络结构
    - 不适合:简单线性数据

    #### 6.2 优化实现

    class OptimizedContextStore:
    def __init__(self):
    self.file_index = {}  # 文件 -> 内容
    self.symbol_index = defaultdict(list)  # 符号 -> 文件列表
    self.dependency_graph = defaultdict(set)  # 文件 -> 依赖文件
    self.reverse_dependency_graph = defaultdict(set)  # 文件 -> 被依赖文件
    def add_file(self, file_path, content):
    self.file_index[file_path] = content
    symbols = self.extract_symbols(content)
    for symbol in symbols:
    self.symbol_index[symbol].append(file_path)
    dependencies = self.extract_dependencies(content)
    for dep in dependencies:
    self.dependency_graph[file_path].add(dep)
    self.reverse_dependency_graph[dep].add(file_path)
    def get_files_by_symbol(self, symbol):
    return self.symbol_index.get(symbol, [])
    def get_dependents(self, file_path):
    return self.reverse_dependency_graph.get(file_path, set())
    def get_dependencies(self, file_path):
    return self.dependency_graph.get(file_path, set())

## 性能监控

### 1\. 指标收集

#### 1.1 性能指标

    bash


    python

    class PerformanceMonitor:
        def __init__(self):
            self.metrics = defaultdict(list)

        def record_execution_time(self, skill_name, duration):
            self.metrics[f"{skill_name}_execution_time"].append({
                "value": duration,
                "timestamp": datetime.now()
            })

        def record_memory_usage(self, skill_name, memory_mb):
            self.metrics[f"{skill_name}_memory_usage"].append({
                "value": memory_mb,
                "timestamp": datetime.now()
            })

        def record_cache_hit_rate(self, cache_name, hit_rate):
            self.metrics[f"{cache_name}_hit_rate"].append({
                "value": hit_rate,
                "timestamp": datetime.now()
            })

        def get_average(self, metric_name):
            values = [m["value"] for m in self.metrics[metric_name]]
            return sum(values) / len(values) if values else 0

        def get_percentile(self, metric_name, percentile):
            values = sorted([m["value"] for m in self.metrics[metric_name]])
            if not values:
                return 0
            index = int(len(values) * percentile / 100)
            return values[index]

    #### 1.2 实时监控

    class RealTimeMonitor:
    def __init__(self, interval=1):
    self.interval = interval
    self.running = False
    self.callbacks = []
    def add_callback(self, callback):
    self.callbacks.append(callback)
    async def start(self):
    self.running = True
    while self.running:
    metrics = self.collect_metrics()
    for callback in self.callbacks:
    await callback(metrics)
    await asyncio.sleep(self.interval)
    def stop(self):
    self.running = False
    def collect_metrics(self):
    return {
    "cpu_usage": psutil.cpu_percent(),
    "memory_usage": psutil.virtual_memory().percent,
    "disk_io": psutil.disk_io_counters(),
    "network_io": psutil.net_io_counters()
    }

### 2\. 性能分析

#### 2.1 瓶颈识别

    bash


    python

    class BottleneckAnalyzer:
        def __init__(self):
            self.execution_times = defaultdict(list)
            self.call_counts = defaultdict(int)

        def record_call(self, skill_name, duration):
            self.execution_times[skill_name].append(duration)
            self.call_counts[skill_name] += 1

        def analyze(self):
            report = {}

            for skill_name in self.execution_times:
                times = self.execution_times[skill_name]
                call_count = self.call_counts[skill_name]

                report[skill_name] = {
                    "total_time": sum(times),
                    "average_time": sum(times) / len(times),
                    "max_time": max(times),
                    "min_time": min(times),
                    "call_count": call_count,
                    "time_percentage": self.calculate_percentage(skill_name)
                }

            return report

        def calculate_percentage(self, skill_name):
            total_time = sum(sum(times) for times in self.execution_times.values())
            skill_time = sum(self.execution_times[skill_name])
            return (skill_time / total_time * 100) if total_time > 0 else 0

    #### 2.2 性能报告

    class PerformanceReporter:
    def __init__(self):
    self.analyzer = BottleneckAnalyzer()
    def generate_report(self):
    analysis = self.analyzer.analyze()
    report = []
    report.append("# Performance Report")
    report.append(f"Generated: {datetime.now()}")
    report.append("")
    report.append("## Execution Summary")
    for skill_name, metrics in sorted(
    analysis.items(),
    key=lambda x: x[1]["total_time"],
    reverse=True
    ):
    report.append(f"\n### {skill_name}")
    report.append(f"- Total Time: {metrics['total_time']:.2f}s")
    report.append(f"- Average Time: {metrics['average_time']:.2f}s")
    report.append(f"- Max Time: {metrics['max_time']:.2f}s")
    report.append(f"- Call Count: {metrics['call_count']}")
    report.append(f"- Time Percentage: {metrics['time_percentage']:.2f}%")
    return "\n".join(report)

## 优化实践

### 1\. 代码审查优化

#### 1.1 优化策略

    bash


    markdown

    ## 代码审查优化

    ### 增量审查
    - 只审查变更的文件
    - 复用之前的审查结果
    - 使用文件哈希检测变更

    ### 并行审查
    - 并行审查多个文件
    - 使用线程池加速
    - 合理分配资源

    ### 缓存结果
    - 缓存审查结果
    - 使用多级缓存
    - 设置合理的 TTL

    ### 智能过滤
    - 过滤无关文件
    - 优先审查重要文件
    - 使用规则引擎

    #### 1.2 实现示例

    class OptimizedCodeReview:
    def __init__(self):
    self.cache = MultiLevelCache()
    self.change_detector = ChangeDetector()
    self.executor = ParallelSkillExecutor()
    async def review(self, files):
    # 检测变更
    changes = self.change_detector.detect_file_changes(files)
    # 只审查变更的文件
    changed_files = [c["file"] for c in changes if c["type"] in ["added", "modified"]]
    # 并行审查
    tasks = [self.review_file(file) for file in changed_files]
    results = await self.executor.execute_parallel(tasks)
    return results
    async def review_file(self, file_path):
    # 检查缓存
    cache_key = self.generate_cache_key(file_path)
    cached_result = await self.cache.get(cache_key)
    if cached_result:
    return cached_result
    # 执行审查
    result = await self.execute_review(file_path)
    # 缓存结果
    await self.cache.set(cache_key, result)
    return result

### 2\. 文档生成优化

#### 2.1 优化策略

    bash


    markdown

    ## 文档生成优化

    ### 增量生成
    - 只生成变更的文档
    - 复用未变更的部分
    - 使用差异算法

    ### 模板缓存
    - 缓存文档模板
    - 预编译模板
    - 使用高效的模板引擎

    ### 批量处理
    - 批量生成文档
    - 减少文件 I/O
    - 使用流式写入

    ### 压缩输出
    - 压缩生成的文档
    - 使用高效的压缩算法
    - 支持流式压缩

    #### 2.2 实现示例

    class OptimizedDocGenerator:
    def __init__(self):
    self.template_cache = LRUCache(maxsize=50)
    self.diff_calculator = DiffCalculator()
    async def generate_docs(self, files, previous_docs=None):
    if previous_docs is None:
    return await self.full_generate(files)
    # 计算差异
    diffs = self.diff_calculator.calculate_diffs(files, previous_docs)
    # 只生成变更的部分
    results = {}
    for file_path, diff in diffs.items():
    if diff["changed"]:
    results[file_path] = await self.generate_doc(file_path)
    else:
    results[file_path] = previous_docs[file_path]
    return results
    async def generate_doc(self, file_path):
    # 获取模板
    template = await self.get_template(file_path)
    # 生成文档
    content = await self.render_template(template, file_path)
    return content
    async def get_template(self, file_path):
    template_name = self.get_template_name(file_path)
    if template_name in self.template_cache:
    return self.template_cache[template_name]
    template = await self.load_template(template_name)
    self.template_cache[template_name] = template
    return template

### 3\. 测试生成优化

#### 3.1 优化策略

    bash


    markdown

    ## 测试生成优化

    ### 智能分析
    - 分析代码覆盖率
    - 识别未测试的路径
    - 优先生成关键测试

    ### 测试去重
    - 检测重复测试
    - 合并相似测试
    - 优化测试套件

    ### 并行执行
    - 并行运行测试
    - 使用测试分片
    - 优化测试顺序

    ### 缓存测试结果
    - 缓存测试结果
    - 使用快照
    - 支持增量测试

    #### 3.2 实现示例

    class OptimizedTestGenerator:
    def __init__(self):
    self.coverage_analyzer = CoverageAnalyzer()
    self.test_deduplicator = TestDeduplicator()
    self.result_cache = ResultCache()
    async def generate_tests(self, files):
    # 分析覆盖率
    coverage = await self.coverage_analyzer.analyze(files)
    # 识别需要测试的代码
    untested_code = coverage.get_untested_code()
    # 生成测试
    tests = []
    for code in untested_code:
    test = await self.generate_test(code)
    tests.append(test)
    # 去重
    unique_tests = self.test_deduplicator.deduplicate(tests)
    return unique_tests
    async def generate_test(self, code):
    # 检查缓存
    cache_key = self.generate_cache_key(code)
    cached_test = await self.result_cache.get(cache_key)
    if cached_test:
    return cached_test
    # 生成测试
    test = await self.create_test(code)
    # 缓存结果
    await self.result_cache.set(cache_key, test)
    return test

    ```

基于 MIT 许可发布 | 永久导航