25.6 智能维护
智能维护概述
Claude Code 可以自动化维护任务、监控系统健康状态、预测潜在问题,并提供维护优化建议。通过智能维护功能,开发者可以降低维护成本、提高系统可靠性。
自动化维护
1. 依赖管理
示例:自动化依赖管理
用户请求: "为项目实现自动化依赖管理"
Claude Code 生成的代码:
python
## requirements.txt
bash
Flask-SQLAlchemy==3.0.5
Flask-JWT-Extended==4.5.2
Flask-Mail==0.9.1
Flask-Migrate==4.0.5
gunicorn==21.2.0
psycopg2-binary==2.9.7
redis==4.6.0
celery==5.3.4
python-dotenv==1.0.0
```
## requirements-dev.txt
```python
```
pytest==7.4.2
pytest-cov==4.1.0
pytest-mock==3.11.1
black==23.9.1
flake8==6.1.0
mypy==1.5.1
isort==5.12.0
pre-commit==3.4.0
## 依赖更新脚本
````python
`python
#!/usr/bin/env python3
"""
依赖更新脚本
自动检查并更新过期的依赖包
"""
import subprocess
import json
from typing import List, Dict
import sys
def get_outdated_packages() -> List[Dict]:
"""获取过期的包"""
result = subprocess.run(
['pip', 'list', '--outdated', '--format=json'],
capture_output=True,
text=True
)
if result.returncode != 0:
print("Error: Failed to get outdated packages")
return []
packages = json.loads(result.stdout)
return packages
def check_compatibility(package: str, version: str) -> bool:
"""检查包的兼容性"""
try:
result = subprocess.run(
['pip', 'install', '--dry-run', f'{package}=={version}'],
capture_output=True,
text=True
)
return result.returncode == 0
except Exception as e:
print(f"Error checking compatibility for {package}: {e}")
return False
def update_package(package: str, version: str) -> bool:
"""更新包"""
try:
result = subprocess.run(
['pip', 'install', '--upgrade', f'{package}=={version}'],
capture_output=True,
text=True
)
return result.returncode == 0
except Exception as e:
print(f"Error updating {package}: {e}")
return False
def update_requirements():
"""更新 requirements.txt"""
try:
result = subprocess.run(
['pip', 'freeze'],
capture_output=True,
text=True
)
with open('requirements.txt', 'w') as f:
f.write(result.stdout)
return True
except Exception as e:
print(f"Error updating requirements.txt: {e}")
return False
def main():
"""主函数"""
print("检查过期依赖...")
outdated_packages = get_outdated_packages()
if not outdated_packages:
print("所有依赖都是最新的!")
return
print(f"\n发现 {len(outdated_packages)} 个过期包:")
for pkg in outdated_packages:
print(f" - {pkg['name']}: {pkg['version']} -> {pkg['latest_version']}")
print("\n检查兼容性...")
compatible_packages = []
for pkg in outdated_packages:
package_name = pkg['name']
latest_version = pkg['latest_version']
if check_compatibility(package_name, latest_version):
compatible_packages.append(pkg)
print(f" ✓ {package_name} {latest_version}")
else:
print(f" ✗ {package_name} {latest_version} (不兼容)")
if not compatible_packages:
print("\n没有可更新的包")
return
print(f"\n准备更新 {len(compatible_packages)} 个包...")
response = input("是否继续?(y/n): ")
if response.lower() != 'y':
print("取消更新")
return
print("\n开始更新...")
for pkg in compatible_packages:
package_name = pkg['name']
latest_version = pkg['latest_version']
print(f"更新 {package_name} 到 {latest_version}...")
if update_package(package_name, latest_version):
print(f" ✓ {package_name} 更新成功")
else:
print(f" ✗ {package_name} 更新失败")
print("\n更新 requirements.txt...")
if update_requirements():
print(" ✓ requirements.txt 更新成功")
else:
print(" ✗ requirements.txt 更新失败")
print("\n依赖更新完成!")
if __name__ == '__main__':
main()
```## 依赖安全检查
```
python
#!/usr/bin/env python3
"""
依赖安全检查脚本
检查依赖包的安全漏洞
"""
import subprocess
import json
from typing import List, Dict
def check_security() -> List[Dict]:
"""检查依赖安全性"""
try:
result = subprocess.run(
['pip', 'audit'],
capture_output=True,
text=True
)
if result.returncode != 0:
print("Error: Failed to check security")
return []
return json.loads(result.stdout)
except Exception as e:
print(f"Error checking security: {e}")
return []
def main():
"""主函数"""
print("检查依赖安全性...")
vulnerabilities = check_security()
if not vulnerabilities:
print("未发现安全漏洞!")
return
print(f"\n发现 {len(vulnerabilities)} 个安全漏洞:")
for vuln in vulnerabilities:
print(f"\n 包: {vuln['name']}")
print(f" 版本: {vuln['version']}")
print(f" 漏洞: {vuln['advisory']}")
print(f" 严重性: {vuln['severity']}")
print(f" 修复版本: {vuln['fix_versions']}")
if __name__ == '__main__':
main()
### 2. 数据库维护
```
python
# 示例:自动化数据库维护
用户请求:
"为应用实现自动化数据库维护"
Claude Code 生成的代码:
```python
```python
from datetime import datetime, timedelta
from sqlalchemy import text
from app import db
import logging
logger = logging.getLogger(__name__)
class DatabaseMaintenance:
"""数据库维护类"""
def __init__(self, db_session):
self.db = db_session
def analyze_tables(self):
"""分析表"""
tables = ['users', 'products', 'orders', 'payments']
for table in tables:
try:
result = self.db.execute(text(f"ANALYZE {table}"))
logger.info(f"Table {table} analyzed successfully")
except Exception as e:
logger.error(f"Error analyzing table {table}: {e}")
def vacuum_tables(self):
"""清理表"""
tables = ['users', 'products', 'orders', 'payments']
for table in tables:
try:
result = self.db.execute(text(f"VACUUM ANALYZE {table}"))
logger.info(f"Table {table} vacuumed successfully")
except Exception as e:
logger.error(f"Error vacuuming table {table}: {e}")
def rebuild_indexes(self):
"""重建索引"""
indexes = [
'idx_users_username',
'idx_users_email',
'idx_orders_user_id',
'idx_orders_status',
'idx_products_name'
]
for index in indexes:
try:
result = self.db.execute(text(f"REINDEX INDEX {index}"))
logger.info(f"Index {index} rebuilt successfully")
except Exception as e:
logger.error(f"Error rebuilding index {index}: {e}")
def clean_old_logs(self, days=30):
"""清理旧日志"""
cutoff_date = datetime.utcnow() - timedelta(days=days)
try:
result = self.db.execute(
text("""
DELETE FROM logs
WHERE created_at < :cutoff_date
"""),
{'cutoff_date': cutoff_date}
)
deleted_count = result.rowcount
logger.info(f"Deleted {deleted_count} old log entries")
self.db.commit()
return deleted_count
except Exception as e:
logger.error(f"Error cleaning old logs: {e}")
self.db.rollback()
return 0
def clean_old_sessions(self, days=7):
"""清理旧会话"""
cutoff_date = datetime.utcnow() - timedelta(days=days)
try:
result = self.db.execute(
text("""
DELETE FROM sessions
WHERE expires_at < :cutoff_date
"""),
{'cutoff_date': cutoff_date}
)
deleted_count = result.rowcount
logger.info(f"Deleted {deleted_count} old sessions")
self.db.commit()
return deleted_count
except Exception as e:
logger.error(f"Error cleaning old sessions: {e}")
self.db.rollback()
return 0
def optimize_database(self):
"""优化数据库"""
try:
self.analyze_tables()
self.vacuum_tables()
self.rebuild_indexes()
logger.info("Database optimization completed successfully")
except Exception as e:
logger.error(f"Error optimizing database: {e}")
def get_database_stats(self):
"""获取数据库统计信息"""
stats = {}
try:
# 表大小
result = self.db.execute(text("""
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
"""))
stats['table_sizes'] = [
{
'schema': row[0],
'table': row[1],
'size': row[2]
}
for row in result
]
# 索引使用情况
result = self.db.execute(text("""
SELECT
schemaname,
tablename,
indexname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
ORDER BY idx_scan DESC
"""))
stats['index_usage'] = [
{
'schema': row[0],
'table': row[1],
'index': row[2],
'scans': row[3],
'tuples_read': row[4],
'tuples_fetched': row[5]
}
for row in result
]
# 慢查询
result = self.db.execute(text("""
SELECT
query,
calls,
total_time,
mean_time,
max_time
FROM pg_stat_statements
ORDER BY mean_time DESC
LIMIT 10
"""))
stats['slow_queries'] = [
{
'query': row[0],
'calls': row[1],
'total_time': row[2],
'mean_time': row[3],
'max_time': row[4]
}
for row in result
]
return stats
except Exception as e:
logger.error(f"Error getting database stats: {e}")
return stats
def run_maintenance():
"""运行维护任务"""
maintenance = DatabaseMaintenance(db.session)
print("开始数据库维护...")
# 优化数据库
print("优化数据库...")
maintenance.optimize_database()
# 清理旧数据
print("清理旧日志...")
maintenance.clean_old_logs(days=30)
print("清理旧会话...")
maintenance.clean_old_sessions(days=7)
# 获取统计信息
print("获取数据库统计信息...")
stats = maintenance.get_database_stats()
print("\n数据库统计信息:")
print(f" 表数量: {len(stats['table_sizes'])}")
print(f" 索引数量: {len(stats['index_usage'])}")
print(f" 慢查询数量: {len(stats['slow_queries'])}")
print("\n数据库维护完成!")
if __name__ == '__main__':
run_maintenance()
## 系统监控
### 1. 性能监控
```
python
# 示例:实现性能监控
用户请求:
"为应用实现性能监控"
Claude Code 生成的代码:
```python
```python
import time
import psutil
from functools import wraps
from flask import request, g
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import logging
logger = logging.getLogger(__name__)
# Prometheus 指标
request_count = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
active_connections = Gauge(
'active_connections',
'Number of active connections'
)
memory_usage = Gauge(
'memory_usage_bytes',
'Memory usage in bytes'
)
cpu_usage = Gauge(
'cpu_usage_percent',
'CPU usage percentage'
)
disk_usage = Gauge(
'disk_usage_percent',
'Disk usage percentage'
)
def track_request_metrics(f):
"""跟踪请求指标"""
@wraps(f)
def decorated_function(*args, **kwargs):
start_time = time.time()
# 记录请求开始
g.start_time = start_time
try:
response = f(*args, **kwargs)
status_code = response.status_code if hasattr(response, 'status_code') else 200
# 记录请求计数
request_count.labels(
method=request.method,
endpoint=request.endpoint or 'unknown',
status=status_code
).inc()
# 记录请求持续时间
duration = time.time() - start_time
request_duration.labels(
method=request.method,
endpoint=request.endpoint or 'unknown'
).observe(duration)
return response
except Exception as e:
# 记录错误
request_count.labels(
method=request.method,
endpoint=request.endpoint or 'unknown',
status=500
).inc()
raise
return decorated_function
def update_system_metrics():
"""更新系统指标"""
# 内存使用
memory = psutil.virtual_memory()
memory_usage.set(memory.used)
# CPU 使用
cpu_usage.set(psutil.cpu_percent())
# 磁盘使用
disk = psutil.disk_usage('/')
disk_usage.set(disk.percent)
# 活跃连接
active_connections.set(len(psutil.net_connections()))
class PerformanceMonitor:
"""性能监控类"""
def __init__(self, app):
self.app = app
self.metrics = {}
def track_function(self, name):
"""跟踪函数性能"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
start_time = time.time()
try:
result = f(*args, **kwargs)
duration = time.time() - start_time
self.record_metric(name, duration, success=True)
return result
except Exception as e:
duration = time.time() - start_time
self.record_metric(name, duration, success=False)
logger.error(f"Error in {name}: {e}")
raise
return decorated_function
return decorator
def record_metric(self, name, duration, success=True):
"""记录指标"""
if name not in self.metrics:
self.metrics[name] = {
'count': 0,
'total_duration': 0,
'success_count': 0,
'error_count': 0,
'min_duration': float('inf'),
'max_duration': 0
}
metric = self.metrics[name]
metric['count'] += 1
metric['total_duration'] += duration
if success:
metric['success_count'] += 1
else:
metric['error_count'] += 1
metric['min_duration'] = min(metric['min_duration'], duration)
metric['max_duration'] = max(metric['max_duration'], duration)
def get_metrics(self):
"""获取指标"""
for name, metric in self.metrics.items():
if metric['count'] > 0:
metric['avg_duration'] = metric['total_duration'] / metric['count']
metric['success_rate'] = metric['success_count'] / metric['count']
return self.metrics
def get_slow_functions(self, threshold=1.0):
"""获取慢函数"""
slow_functions = []
for name, metric in self.metrics.items():
if metric['count'] > 0:
avg_duration = metric['total_duration'] / metric['count']
if avg_duration > threshold:
slow_functions.append({
'name': name,
'avg_duration': avg_duration,
'count': metric['count'],
'max_duration': metric['max_duration']
})
return sorted(slow_functions, key=lambda x: x['avg_duration'], reverse=True)
def setup_monitoring(app):
"""设置监控"""
@app.route('/metrics')
def metrics():
"""Prometheus 指标端点"""
return generate_latest()
@app.before_request
def before_request():
"""请求前处理"""
g.start_time = time.time()
active_connections.inc()
@app.after_request
def after_request(response):
"""请求后处理"""
if hasattr(g, 'start_time'):
duration = time.time() - g.start_time
request_count.labels(
method=request.method,
endpoint=request.endpoint or 'unknown',
status=response.status_code
).inc()
request_duration.labels(
method=request.method,
endpoint=request.endpoint or 'unknown'
).observe(duration)
active_connections.dec()
return response
# 定期更新系统指标
def update_metrics():
while True:
update_system_metrics()
time.sleep(5)
import threading
thread = threading.Thread(target=update_metrics, daemon=True)
thread.start()
return app
### 2. 错误监控
````python
````python
# 示例:实现错误监控
用户请求:
"为应用实现错误监控"
Claude Code 生成的代码:
```python
```python
import logging
import traceback
from datetime import datetime
from typing import Dict, List
from flask import request, g
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
logger = logging.getLogger(__name__)
class ErrorMonitor:
"""错误监控类"""
def __init__(self, app):
self.app = app
self.errors = []
self.error_stats = {}
def capture_exception(self, exception, context=None):
"""捕获异常"""
error_data = {
'type': type(exception).__name__,
'message': str(exception),
'traceback': traceback.format_exc(),
'timestamp': datetime.utcnow().isoformat(),
'context': context or {}
}
# 添加请求信息
if request:
error_data['request'] = {
'method': request.method,
'path': request.path,
'url': request.url,
'ip': request.remote_addr,
'user_agent': request.user_agent.string
}
# 添加用户信息
if hasattr(g, 'user_id'):
error_data['user_id'] = g.user_id
self.errors.append(error_data)
# 更新统计
error_type = error_data['type']
if error_type not in self.error_stats:
self.error_stats[error_type] = {
'count': 0,
'last_occurrence': None
}
self.error_stats[error_type]['count'] += 1
self.error_stats[error_type]['last_occurrence'] = error_data['timestamp']
# 记录日志
logger.error(
f"Exception captured: {error_type}",
extra=error_data
)
# 发送到 Sentry
sentry_sdk.capture_exception(exception)
def get_errors(self, limit=100):
"""获取错误列表"""
return self.errors[-limit:]
def get_error_stats(self):
"""获取错误统计"""
return self.error_stats
def get_frequent_errors(self, threshold=10):
"""获取频繁错误"""
frequent_errors = []
for error_type, stats in self.error_stats.items():
if stats['count'] >= threshold:
frequent_errors.append({
'type': error_type,
'count': stats['count'],
'last_occurrence': stats['last_occurrence']
})
return sorted(frequent_errors, key=lambda x: x['count'], reverse=True)
def clear_errors(self):
"""清除错误"""
self.errors = []
self.error_stats = {}
def setup_error_monitoring(app, sentry_dsn):
"""设置错误监控"""
# 初始化 Sentry
sentry_sdk.init(
dsn=sentry_dsn,
integrations=[FlaskIntegration()],
traces_sample_rate=1.0,
profiles_sample_rate=1.0
)
error_monitor = ErrorMonitor(app)
@app.errorhandler(Exception)
def handle_exception(e):
"""处理异常"""
error_monitor.capture_exception(e)
if request.is_json:
return {'error': str(e)}, 500
else:
return str(e), 500
@app.errorhandler(404)
def handle_not_found(e):
"""处理 404"""
logger.warning(f"404 Not Found: {request.path}")
return {'error': 'Not found'}, 404
@app.errorhandler(500)
def handle_server_error(e):
"""处理 500"""
error_monitor.capture_exception(e)
return {'error': 'Internal server error'}, 500
@app.route('/admin/errors')
def get_errors():
"""获取错误列表"""
errors = error_monitor.get_errors()
return {'errors': errors}
@app.route('/admin/errors/stats')
def get_error_stats():
"""获取错误统计"""
stats = error_monitor.get_error_stats()
return {'stats': stats}
@app.route('/admin/errors/frequent')
def get_frequent_errors():
"""获取频繁错误"""
frequent_errors = error_monitor.get_frequent_errors()
return {'frequent_errors': frequent_errors}
return app
```## 预测性维护
### 1. 容量预测
# 示例:实现容量预测
用户请求:
"为应用实现容量预测"
Claude Code 生成的代码:
````python
`python
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict
import logging
logger = logging.getLogger(__name__)
class CapacityPredictor:
"""容量预测器"""
def __init__(self):
self.history = []
self.predictions = {}
def add_metric(self, timestamp: datetime, metric_name: str, value: float):
"""添加指标"""
self.history.append({
'timestamp': timestamp,
'metric': metric_name,
'value': value
})
def predict_capacity(self, metric_name: str, days: int = 7) -> List[Dict]:
"""预测容量"""
# 获取历史数据
data = [
entry for entry in self.history
if entry['metric'] == metric_name
]
if len(data) < 30:
logger.warning(f"Insufficient data for {metric_name}")
return []
# 提取值
values = [entry['value'] for entry in data]
# 计算趋势
trend = self._calculate_trend(values)
# 预测未来值
predictions = []
for i in range(days):
predicted_value = values[-1] + trend * (i + 1)
predicted_date = datetime.utcnow() + timedelta(days=i + 1)
predictions.append({
'date': predicted_date.isoformat(),
'value': predicted_value,
'metric': metric_name
})
self.predictions[metric_name] = predictions
return predictions
def _calculate_trend(self, values: List[float]) -> float:
"""计算趋势"""
if len(values) < 2:
return 0
# 使用线性回归
x = np.arange(len(values))
y = np.array(values)
# 计算斜率
slope = np.polyfit(x, y, 1)[0]
return slope
def check_capacity_alerts(self, threshold: float = 0.9) -> List[Dict]:
"""检查容量告警"""
alerts = []
for metric_name, predictions in self.predictions.items():
for prediction in predictions:
if prediction['value'] >= threshold:
alerts.append({
'metric': metric_name,
'date': prediction['date'],
'value': prediction['value'],
'threshold': threshold
})
return sorted(alerts, key=lambda x: x['value'], reverse=True)
def get_capacity_recommendations(self) -> List[Dict]:
"""获取容量建议"""
recommendations = []
alerts = self.check_capacity_alerts()
if alerts:
recommendations.append({
'type': 'scale_up',
'message': f"发现 {len(alerts)} 个容量告警,建议扩容",
'alerts': alerts
})
# 检查资源利用率
for metric_name, predictions in self.predictions.items():
avg_value = np.mean([p['value'] for p in predictions])
if avg_value < 0.3:
recommendations.append({
'type': 'scale_down',
'message': f"{metric_name} 平均利用率较低,建议缩容",
'metric': metric_name,
'avg_value': avg_value
})
return recommendations
def run_capacity_prediction():
"""运行容量预测"""
predictor = CapacityPredictor()
# 添加历史数据(示例)
now = datetime.utcnow()
for i in range(30):
timestamp = now - timedelta(days=30 - i)
value = 0.5 + (i / 100) + np.random.normal(0, 0.05)
predictor.add_metric(timestamp, 'cpu_usage', value)
# 预测容量
print("预测 CPU 使用率...")
predictions = predictor.predict_capacity('cpu_usage', days=7)
print("\n预测结果:")
for prediction in predictions:
print(f" {prediction['date']}: {prediction['value']:.2%}")
# 检查告警
print("\n检查容量告警...")
alerts = predictor.check_capacity_alerts(threshold=0.9)
if alerts:
print(f"发现 {len(alerts)} 个告警:")
for alert in alerts:
print(f" {alert['date']}: {alert['value']:.2%} (阈值: {alert['threshold']:.0%})")
else:
print("未发现容量告警")
# 获取建议
print("\n获取容量建议...")
recommendations = predictor.get_capacity_recommendations()
for rec in recommendations:
print(f" {rec['type']}: {rec['message']}")
if __name__ == '__main__':
run_capacity_prediction()
```