Skip to content

25.6 智慧維護

智慧維護概述

Claude Code 可以自動化維護任務、監控系統健康狀態、預測潛在問題,並提供維護最佳化建議。透過智慧維護功能,開發者可以降低維護成本、提高系統可靠性。

自动化维护

1. 依赖管理

示例:自动化依赖管理

使用者請求: "為專案實現自動化依賴管理"

Claude Code 生成的程式碼:

python
## requirements.txt

    bash


    Flask-SQLAlchemy==3.0.5
    Flask-JWT-Extended==4.5.2
    Flask-Mail==0.9.1
    Flask-Migrate==4.0.5
    gunicorn==21.2.0
    psycopg2-binary==2.9.7
    redis==4.6.0
    celery==5.3.4
    python-dotenv==1.0.0

    ```

    ## requirements-dev.txt

```python

    ```

    pytest==7.4.2
    pytest-cov==4.1.0
    pytest-mock==3.11.1
    black==23.9.1
    flake8==6.1.0
    mypy==1.5.1
    isort==5.12.0
    pre-commit==3.4.0

    ## 依賴更新指令碼

    ````python
    `python

    #!/usr/bin/env python3
    """
    依赖更新脚本
    自动检查并更新过期的依赖包
    """

    import subprocess
    import json
    from typing import List, Dict
    import sys

    def get_outdated_packages() -> List[Dict]:
    """获取过期的包"""
    result = subprocess.run(
    ['pip', 'list', '--outdated', '--format=json'],
    capture_output=True,
    text=True
    )

    if result.returncode != 0:
    print("Error: Failed to get outdated packages")
    return []

    packages = json.loads(result.stdout)
    return packages

    def check_compatibility(package: str, version: str) -> bool:
    """检查包的兼容性"""
    try:
    result = subprocess.run(
    ['pip', 'install', '--dry-run', f'{package}=={version}'],
    capture_output=True,
    text=True
    )

    return result.returncode == 0
    except Exception as e:
    print(f"Error checking compatibility for {package}: {e}")
    return False

    def update_package(package: str, version: str) -> bool:
    """更新包"""
    try:
    result = subprocess.run(
    ['pip', 'install', '--upgrade', f'{package}=={version}'],
    capture_output=True,
    text=True
    )

    return result.returncode == 0
    except Exception as e:
    print(f"Error updating {package}: {e}")
    return False

    def update_requirements():
    """更新 requirements.txt"""
    try:
    result = subprocess.run(
    ['pip', 'freeze'],
    capture_output=True,
    text=True
    )

    with open('requirements.txt', 'w') as f:
    f.write(result.stdout)

    return True
    except Exception as e:
    print(f"Error updating requirements.txt: {e}")
    return False

    def main():
    """主函数"""
    print("检查过期依赖...")

    outdated_packages = get_outdated_packages()

    if not outdated_packages:
    print("所有依赖都是最新的!")
    return

    print(f"\n发现 {len(outdated_packages)} 个过期包:")
    for pkg in outdated_packages:
    print(f"  - {pkg['name']}: {pkg['version']} -> {pkg['latest_version']}")

    print("\n检查兼容性...")
    compatible_packages = []

    for pkg in outdated_packages:
    package_name = pkg['name']
    latest_version = pkg['latest_version']

    if check_compatibility(package_name, latest_version):
    compatible_packages.append(pkg)
    print(f"  ✓ {package_name} {latest_version}")
    else:
    print(f"  ✗ {package_name} {latest_version} (不兼容)")

    if not compatible_packages:
    print("\n没有可更新的包")
    return

    print(f"\n准备更新 {len(compatible_packages)} 个包...")
    response = input("是否继续?(y/n): ")

    if response.lower() != 'y':
    print("取消更新")
    return

    print("\n开始更新...")
    for pkg in compatible_packages:
    package_name = pkg['name']
    latest_version = pkg['latest_version']

    print(f"更新 {package_name}{latest_version}...")
    if update_package(package_name, latest_version):
    print(f"  ✓ {package_name} 更新成功")
    else:
    print(f"  ✗ {package_name} 更新失败")

    print("\n更新 requirements.txt...")
    if update_requirements():
    print("  ✓ requirements.txt 更新成功")
    else:
    print("  ✗ requirements.txt 更新失败")

    print("\n依赖更新完成!")

    if __name__ == '__main__':
    main()

    ```## 依賴安全檢查

    ```
    python

    #!/usr/bin/env python3
    """
    依赖安全检查脚本
    检查依赖包的安全漏洞
    """

    import subprocess
    import json
    from typing import List, Dict

    def check_security() -> List[Dict]:
        """检查依赖安全性"""
        try:
            result = subprocess.run(
                ['pip', 'audit'],
                capture_output=True,
                text=True
            )

            if result.returncode != 0:
                print("Error: Failed to check security")
                return []

            return json.loads(result.stdout)
        except Exception as e:
            print(f"Error checking security: {e}")
            return []

    def main():
        """主函数"""
        print("检查依赖安全性...")

        vulnerabilities = check_security()

        if not vulnerabilities:
            print("未发现安全漏洞!")
            return

        print(f"\n发现 {len(vulnerabilities)} 个安全漏洞:")
        for vuln in vulnerabilities:
            print(f"\n  包: {vuln['name']}")
            print(f"  版本: {vuln['version']}")
            print(f"  漏洞: {vuln['advisory']}")
            print(f"  严重性: {vuln['severity']}")
            print(f"  修复版本: {vuln['fix_versions']}")

    if __name__ == '__main__':
        main()

    ### 2. 数据库维护

    ```

    python

    # 示例:自動化資料庫維護

    使用者請求:
    "為應用實現自動化資料庫維護"

    Claude Code 生成的程式碼:

    ```python

    ```python

    from datetime import datetime, timedelta
    from sqlalchemy import text
    from app import db
    import logging

    logger = logging.getLogger(__name__)

    class DatabaseMaintenance:
        """資料庫維護類"""

        def __init__(self, db_session):
            self.db = db_session

        def analyze_tables(self):
            """分析表"""
            tables = ['users', 'products', 'orders', 'payments']

            for table in tables:
                try:
                    result = self.db.execute(text(f"ANALYZE {table}"))
                    logger.info(f"Table {table} analyzed successfully")
                except Exception as e:
                    logger.error(f"Error analyzing table {table}: {e}")

        def vacuum_tables(self):
            """清理表"""
            tables = ['users', 'products', 'orders', 'payments']

            for table in tables:
                try:
                    result = self.db.execute(text(f"VACUUM ANALYZE {table}"))
                    logger.info(f"Table {table} vacuumed successfully")
                except Exception as e:
                    logger.error(f"Error vacuuming table {table}: {e}")

        def rebuild_indexes(self):
            """重建索引"""
            indexes = [
                'idx_users_username',
                'idx_users_email',
                'idx_orders_user_id',
                'idx_orders_status',
                'idx_products_name'
            ]

            for index in indexes:
                try:
                    result = self.db.execute(text(f"REINDEX INDEX {index}"))
                    logger.info(f"Index {index} rebuilt successfully")
                except Exception as e:
                    logger.error(f"Error rebuilding index {index}: {e}")

        def clean_old_logs(self, days=30):
            """清理舊日誌"""
            cutoff_date = datetime.utcnow() - timedelta(days=days)

            try:
                result = self.db.execute(
                    text("""
                        DELETE FROM logs
                        WHERE created_at < :cutoff_date
                    """),
                    {'cutoff_date': cutoff_date}
                )

                deleted_count = result.rowcount
                logger.info(f"Deleted {deleted_count} old log entries")

                self.db.commit()

                return deleted_count
            except Exception as e:
                logger.error(f"Error cleaning old logs: {e}")
                self.db.rollback()
                return 0

        def clean_old_sessions(self, days=7):
            """清理舊會話"""
            cutoff_date = datetime.utcnow() - timedelta(days=days)

            try:
                result = self.db.execute(
                    text("""
                        DELETE FROM sessions
                        WHERE expires_at < :cutoff_date
                    """),
                    {'cutoff_date': cutoff_date}
                )

                deleted_count = result.rowcount
                logger.info(f"Deleted {deleted_count} old sessions")

                self.db.commit()

                return deleted_count
            except Exception as e:
                logger.error(f"Error cleaning old sessions: {e}")
                self.db.rollback()
                return 0

        def optimize_database(self):
            """最佳化資料庫"""
            try:
                self.analyze_tables()
                self.vacuum_tables()
                self.rebuild_indexes()
                logger.info("Database optimization completed successfully")
            except Exception as e:
                logger.error(f"Error optimizing database: {e}")

        def get_database_stats(self):
            """獲取資料庫統計資訊"""
            stats = {}

            try:

                # 表大小

                result = self.db.execute(text("""
                    SELECT
                        schemaname,
                        tablename,
                        pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
                    FROM pg_tables
                    WHERE schemaname = 'public'
                    ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
                """))

                stats['table_sizes'] = [
                    {
                        'schema': row[0],
                        'table': row[1],
                        'size': row[2]
                    }
                    for row in result
                ]

                # 索引使用情況

                result = self.db.execute(text("""
                    SELECT
                        schemaname,
                        tablename,
                        indexname,
                        idx_scan,
                        idx_tup_read,
                        idx_tup_fetch
                    FROM pg_stat_user_indexes
                    ORDER BY idx_scan DESC
                """))

                stats['index_usage'] = [
                    {
                        'schema': row[0],
                        'table': row[1],
                        'index': row[2],
                        'scans': row[3],
                        'tuples_read': row[4],
                        'tuples_fetched': row[5]
                    }
                    for row in result
                ]

                # 慢查詢

                result = self.db.execute(text("""
                    SELECT
                        query,
                        calls,
                        total_time,
                        mean_time,
                        max_time
                    FROM pg_stat_statements
                    ORDER BY mean_time DESC
                    LIMIT 10
                """))

                stats['slow_queries'] = [
                    {
                        'query': row[0],
                        'calls': row[1],
                        'total_time': row[2],
                        'mean_time': row[3],
                        'max_time': row[4]
                    }
                    for row in result
                ]

                return stats
            except Exception as e:
                logger.error(f"Error getting database stats: {e}")
                return stats

    def run_maintenance():
        """執行維護任務"""
        maintenance = DatabaseMaintenance(db.session)

        print("開始資料庫維護...")

        # 最佳化資料庫

        print("最佳化資料庫...")
        maintenance.optimize_database()

        # 清理舊資料

        print("清理舊日誌...")
        maintenance.clean_old_logs(days=30)

        print("清理舊會話...")
        maintenance.clean_old_sessions(days=7)

        # 獲取統計資訊

        print("獲取資料庫統計資訊...")
        stats = maintenance.get_database_stats()

        print("\n資料庫統計資訊:")
        print(f"  表數量: {len(stats['table_sizes'])}")
        print(f"  索引數量: {len(stats['index_usage'])}")
        print(f"  慢查詢數量: {len(stats['slow_queries'])}")

        print("\n資料庫維護完成!")

    if __name__ == '__main__':
        run_maintenance()

    ## 系統監控

    ### 1. 效能監控

    ```
    python

    # 示例:实现性能监控

    用户请求:
    "为应用实现性能监控"

    Claude Code 生成的代码:

    ```python

    ```python

    import time
    import psutil
    from functools import wraps
    from flask import request, g
    from prometheus_client import Counter, Histogram, Gauge, generate_latest
    import logging

    logger = logging.getLogger(__name__)

    # Prometheus 指标
    request_count = Counter(
        'http_requests_total',
        'Total HTTP requests',
        ['method', 'endpoint', 'status']
    )

    request_duration = Histogram(
        'http_request_duration_seconds',
        'HTTP request duration',
        ['method', 'endpoint']
    )

    active_connections = Gauge(
        'active_connections',
        'Number of active connections'
    )

    memory_usage = Gauge(
        'memory_usage_bytes',
        'Memory usage in bytes'
    )

    cpu_usage = Gauge(
        'cpu_usage_percent',
        'CPU usage percentage'
    )

    disk_usage = Gauge(
        'disk_usage_percent',
        'Disk usage percentage'
    )

    def track_request_metrics(f):
        """跟踪请求指标"""
        @wraps(f)
        def decorated_function(*args, **kwargs):
            start_time = time.time()

            # 记录请求开始
            g.start_time = start_time

            try:
                response = f(*args, **kwargs)
                status_code = response.status_code if hasattr(response, 'status_code') else 200

                # 记录请求计数
                request_count.labels(
                    method=request.method,
                    endpoint=request.endpoint or 'unknown',
                    status=status_code
                ).inc()

                # 记录请求持续时间
                duration = time.time() - start_time
                request_duration.labels(
                    method=request.method,
                    endpoint=request.endpoint or 'unknown'
                ).observe(duration)

                return response
            except Exception as e:
                # 记录错误
                request_count.labels(
                    method=request.method,
                    endpoint=request.endpoint or 'unknown',
                    status=500
                ).inc()
                raise

        return decorated_function

    def update_system_metrics():
        """更新系统指标"""
        # 内存使用
        memory = psutil.virtual_memory()
        memory_usage.set(memory.used)

        # CPU 使用
        cpu_usage.set(psutil.cpu_percent())

        # 磁盘使用
        disk = psutil.disk_usage('/')
        disk_usage.set(disk.percent)

        # 活跃连接
        active_connections.set(len(psutil.net_connections()))

    class PerformanceMonitor:
        """性能监控类"""

        def __init__(self, app):
            self.app = app
            self.metrics = {}

        def track_function(self, name):
            """跟踪函数性能"""
            def decorator(f):
                @wraps(f)
                def decorated_function(*args, **kwargs):
                    start_time = time.time()

                    try:
                        result = f(*args, **kwargs)
                        duration = time.time() - start_time

                        self.record_metric(name, duration, success=True)

                        return result
                    except Exception as e:
                        duration = time.time() - start_time

                        self.record_metric(name, duration, success=False)

                        logger.error(f"Error in {name}: {e}")
                        raise

                return decorated_function
            return decorator

        def record_metric(self, name, duration, success=True):
            """记录指标"""
            if name not in self.metrics:
                self.metrics[name] = {
                    'count': 0,
                    'total_duration': 0,
                    'success_count': 0,
                    'error_count': 0,
                    'min_duration': float('inf'),
                    'max_duration': 0
                }

            metric = self.metrics[name]
            metric['count'] += 1
            metric['total_duration'] += duration

            if success:
                metric['success_count'] += 1
            else:
                metric['error_count'] += 1

            metric['min_duration'] = min(metric['min_duration'], duration)
            metric['max_duration'] = max(metric['max_duration'], duration)

        def get_metrics(self):
            """获取指标"""
            for name, metric in self.metrics.items():
                if metric['count'] > 0:
                    metric['avg_duration'] = metric['total_duration'] / metric['count']
                    metric['success_rate'] = metric['success_count'] / metric['count']

            return self.metrics

        def get_slow_functions(self, threshold=1.0):
            """获取慢函数"""
            slow_functions = []

            for name, metric in self.metrics.items():
                if metric['count'] > 0:
                    avg_duration = metric['total_duration'] / metric['count']
                    if avg_duration > threshold:
                        slow_functions.append({
                            'name': name,
                            'avg_duration': avg_duration,
                            'count': metric['count'],
                            'max_duration': metric['max_duration']
                        })

            return sorted(slow_functions, key=lambda x: x['avg_duration'], reverse=True)

    def setup_monitoring(app):
        """设置监控"""

        @app.route('/metrics')
        def metrics():
            """Prometheus 指标端点"""
            return generate_latest()

        @app.before_request
        def before_request():
            """请求前处理"""
            g.start_time = time.time()
            active_connections.inc()

        @app.after_request
        def after_request(response):
            """请求后处理"""
            if hasattr(g, 'start_time'):
                duration = time.time() - g.start_time

                request_count.labels(
                    method=request.method,
                    endpoint=request.endpoint or 'unknown',
                    status=response.status_code
                ).inc()

                request_duration.labels(
                    method=request.method,
                    endpoint=request.endpoint or 'unknown'
                ).observe(duration)

            active_connections.dec()

            return response

        # 定期更新系统指标
        def update_metrics():
            while True:
                update_system_metrics()
                time.sleep(5)

        import threading
        thread = threading.Thread(target=update_metrics, daemon=True)
        thread.start()

        return app

    ### 2. 错误监控

    ````python

    ````python

    # 示例:实现错误监控

    用户请求:
    "为应用实现错误监控"

    Claude Code 生成的代码:

    ```python

    ```python

    import logging
    import traceback
    from datetime import datetime
    from typing import Dict, List
    from flask import request, g
    import sentry_sdk
    from sentry_sdk.integrations.flask import FlaskIntegration

    logger = logging.getLogger(__name__)

    class ErrorMonitor:
        """错误监控类"""

        def __init__(self, app):
            self.app = app
            self.errors = []
            self.error_stats = {}

        def capture_exception(self, exception, context=None):
            """捕获异常"""
            error_data = {
                'type': type(exception).__name__,
                'message': str(exception),
                'traceback': traceback.format_exc(),
                'timestamp': datetime.utcnow().isoformat(),
                'context': context or {}
            }

            # 添加请求信息
            if request:
                error_data['request'] = {
                    'method': request.method,
                    'path': request.path,
                    'url': request.url,
                    'ip': request.remote_addr,
                    'user_agent': request.user_agent.string
                }

            # 添加用户信息
            if hasattr(g, 'user_id'):
                error_data['user_id'] = g.user_id

            self.errors.append(error_data)

            # 更新统计
            error_type = error_data['type']
            if error_type not in self.error_stats:
                self.error_stats[error_type] = {
                    'count': 0,
                    'last_occurrence': None
                }

            self.error_stats[error_type]['count'] += 1
            self.error_stats[error_type]['last_occurrence'] = error_data['timestamp']

            # 记录日志
            logger.error(
                f"Exception captured: {error_type}",
                extra=error_data
            )

            # 发送到 Sentry
            sentry_sdk.capture_exception(exception)

        def get_errors(self, limit=100):
            """获取错误列表"""
            return self.errors[-limit:]

        def get_error_stats(self):
            """获取错误统计"""
            return self.error_stats

        def get_frequent_errors(self, threshold=10):
            """获取频繁错误"""
            frequent_errors = []

            for error_type, stats in self.error_stats.items():
                if stats['count'] >= threshold:
                    frequent_errors.append({
                        'type': error_type,
                        'count': stats['count'],
                        'last_occurrence': stats['last_occurrence']
                    })

            return sorted(frequent_errors, key=lambda x: x['count'], reverse=True)

        def clear_errors(self):
            """清除错误"""
            self.errors = []
            self.error_stats = {}

    def setup_error_monitoring(app, sentry_dsn):
        """设置错误监控"""

        # 初始化 Sentry
        sentry_sdk.init(
            dsn=sentry_dsn,
            integrations=[FlaskIntegration()],
            traces_sample_rate=1.0,
            profiles_sample_rate=1.0
        )

        error_monitor = ErrorMonitor(app)

        @app.errorhandler(Exception)
        def handle_exception(e):
            """处理异常"""
            error_monitor.capture_exception(e)

            if request.is_json:
                return {'error': str(e)}, 500
            else:
                return str(e), 500

        @app.errorhandler(404)
        def handle_not_found(e):
            """处理 404"""
            logger.warning(f"404 Not Found: {request.path}")
            return {'error': 'Not found'}, 404

        @app.errorhandler(500)
        def handle_server_error(e):
            """处理 500"""
            error_monitor.capture_exception(e)
            return {'error': 'Internal server error'}, 500

        @app.route('/admin/errors')
        def get_errors():
            """获取错误列表"""
            errors = error_monitor.get_errors()
            return {'errors': errors}

        @app.route('/admin/errors/stats')
        def get_error_stats():
            """获取错误统计"""
            stats = error_monitor.get_error_stats()
            return {'stats': stats}

        @app.route('/admin/errors/frequent')
        def get_frequent_errors():
            """获取频繁错误"""
            frequent_errors = error_monitor.get_frequent_errors()
            return {'frequent_errors': frequent_errors}

        return app

    ```## 預測性維護

    ### 1. 容量預測

    # 示例:實現容量預測

    使用者請求:
    "為應用實現容量預測"
    Claude Code 生成的程式碼:

    ````python
    `python

    import numpy as np
    from datetime import datetime, timedelta
    from typing import List, Dict
    import logging

    logger = logging.getLogger(__name__)

    class CapacityPredictor:
    """容量预测器"""

    def __init__(self):
    self.history = []
    self.predictions = {}

    def add_metric(self, timestamp: datetime, metric_name: str, value: float):
    """添加指标"""
    self.history.append({
    'timestamp': timestamp,
    'metric': metric_name,
    'value': value
    })

    def predict_capacity(self, metric_name: str, days: int = 7) -> List[Dict]:
    """预测容量"""
     # 获取历史数据
    data = [
    entry for entry in self.history
    if entry['metric'] == metric_name
    ]

    if len(data) < 30:
    logger.warning(f"Insufficient data for {metric_name}")
    return []

     # 提取值
    values = [entry['value'] for entry in data]

     # 计算趋势
    trend = self._calculate_trend(values)

     # 预测未来值
    predictions = []
    for i in range(days):
    predicted_value = values[-1] + trend * (i + 1)
    predicted_date = datetime.utcnow() + timedelta(days=i + 1)

    predictions.append({
    'date': predicted_date.isoformat(),
    'value': predicted_value,
    'metric': metric_name
    })

    self.predictions[metric_name] = predictions

    return predictions

    def _calculate_trend(self, values: List[float]) -> float:
    """计算趋势"""
    if len(values) < 2:
    return 0

     # 使用线性回归
    x = np.arange(len(values))
    y = np.array(values)

     # 计算斜率
    slope = np.polyfit(x, y, 1)[0]

    return slope

    def check_capacity_alerts(self, threshold: float = 0.9) -> List[Dict]:
    """检查容量告警"""
    alerts = []

    for metric_name, predictions in self.predictions.items():
    for prediction in predictions:
    if prediction['value'] >= threshold:
    alerts.append({
    'metric': metric_name,
    'date': prediction['date'],
    'value': prediction['value'],
    'threshold': threshold
    })

    return sorted(alerts, key=lambda x: x['value'], reverse=True)

    def get_capacity_recommendations(self) -> List[Dict]:
    """获取容量建议"""
    recommendations = []

    alerts = self.check_capacity_alerts()

    if alerts:
    recommendations.append({
    'type': 'scale_up',
    'message': f"发现 {len(alerts)} 个容量告警,建议扩容",
    'alerts': alerts
    })

     # 检查资源利用率
    for metric_name, predictions in self.predictions.items():
    avg_value = np.mean([p['value'] for p in predictions])

    if avg_value < 0.3:
    recommendations.append({
    'type': 'scale_down',
    'message': f"{metric_name} 平均利用率较低,建议缩容",
    'metric': metric_name,
    'avg_value': avg_value
    })

    return recommendations

    def run_capacity_prediction():
    """运行容量预测"""
    predictor = CapacityPredictor()

     # 添加历史数据(示例)
    now = datetime.utcnow()
    for i in range(30):
    timestamp = now - timedelta(days=30 - i)
    value = 0.5 + (i / 100) + np.random.normal(0, 0.05)
    predictor.add_metric(timestamp, 'cpu_usage', value)

     # 预测容量
    print("预测 CPU 使用率...")
    predictions = predictor.predict_capacity('cpu_usage', days=7)

    print("\n预测结果:")
    for prediction in predictions:
    print(f"  {prediction['date']}: {prediction['value']:.2%}")

     # 检查告警
    print("\n检查容量告警...")
    alerts = predictor.check_capacity_alerts(threshold=0.9)

    if alerts:
    print(f"发现 {len(alerts)} 个告警:")
    for alert in alerts:
    print(f"  {alert['date']}: {alert['value']:.2%} (阈值: {alert['threshold']:.0%})")
    else:
    print("未发现容量告警")

     # 获取建议
    print("\n获取容量建议...")
    recommendations = predictor.get_capacity_recommendations()

    for rec in recommendations:
    print(f"  {rec['type']}: {rec['message']}")

    if __name__ == '__main__':
    run_capacity_prediction()

    ```

基于 MIT 许可发布 | 永久导航