19.4 性能与安全对比
19.4.1 性能对比
1. 执行性能
Skills 执行性能
Skills 的执行性能主要受限于 LLM 的推理时间和网络延迟。
性能特征 :
python
python
# Skills 执行时间分解
class SkillPerformanceMetrics:
def __init__(self):
self.metrics = {
"intent_parsing": 0, # 意图解析时间
"parameter_extraction": 0, # 参数提取时间
"skill_execution": 0, # 技能执行时间
"result_generation": 0, # 结果生成时间
"total_time": 0 # 总时间
}
def measure_execution(self, skill, user_input):
"""测量执行时间"""
start_time = time.time()
# 意图解析
intent_start = time.time()
intent = self.parse_intent(user_input)
self.metrics["intent_parsing"] = time.time() - intent_start
# 参数提取
param_start = time.time()
parameters = self.extract_parameters(user_input, intent)
self.metrics["parameter_extraction"] = time.time() - param_start
# 技能执行
exec_start = time.time()
result = skill.execute(parameters)
self.metrics["skill_execution"] = time.time() - exec_start
# 结果生成
gen_start = time.time()
formatted_result = self.format_result(result)
self.metrics["result_generation"] = time.time() - gen_start
self.metrics["total_time"] = time.time() - start_time
return formatted_result
# 典型执行时间(毫秒)
# 意图解析:300-500ms
# 参数提取:200-400ms
# 技能执行:100-300ms
# 结果生成:400-600ms
# 总时间:1000-1800ms性能优化策略 :
python
python
class OptimizedSkill(Skill):
def __init__(self):
super().__init__()
self.cache = {} # 缓存常用请求
self.intent_classifier = self.load_intent_classifier() # 预训练分类器
def parse_intent(self, user_input):
"""优化意图解析"""
# 1. 检查缓存
cache_key = self.get_cache_key(user_input)
if cache_key in self.cache:
return self.cache[cache_key]
# 2. 使用预训练分类器(快速)
intent = self.intent_classifier.predict(user_input)
# 3. 缓存结果
self.cache[cache_key] = intent
return intent
def execute(self, parameters, context):
"""优化执行"""
# 批处理请求
if isinstance(parameters, list):
return self.batch_execute(parameters, context)
# 单个请求执行
return super().execute(parameters, context)
def batch_execute(self, parameters_list, context):
"""批量执行"""
results = []
for parameters in parameters_list:
result = super().execute(parameters, context)
results.append(result)
return results
#### 插件执行性能插件的执行性能主要取决于代码效率和系统资源。
性能特征 :
python
# 插件执行时间分解
class PluginPerformanceMetrics: def **init**(self): self.metrics = { "parameter_validation": 0, # 参数验证时间 "method_execution": 0, # 方法执行时间 "result_processing": 0, # 结果处理时间 "total_time": 0 # 总时间 } def measure_execution(self, plugin, method_name, parameters): """测量执行时间""" start_time = time.time()
# 参数验证
valid_start = time.time() self.validate_parameters(plugin, method_name, parameters) self.metrics["parameter_validation"] = time.time() - valid_start
# 方法执行
exec_start = time.time() result = getattr(plugin, method_name)(**parameters) self.metrics["method_execution"] = time.time() - exec_start
# 结果处理
proc_start = time.time() formatted_result = self.process_result(result) self.metrics["result_processing"] = time.time() - proc_start self.metrics["total_time"] = time.time() - start_time return formatted_result
# 典型执行时间(毫秒)
# 参数验证:5-15ms
# 方法执行:20-100ms
# 结果处理:5-10ms
# 总时间:30-125ms
bash
**性能优化策略**:
```python
```python
class OptimizedPlugin(Plugin):
def __init__(self):
super().__init__()
self.cache = LRUCache(maxsize=1000) # LRU 缓存
self.connection_pool = ConnectionPool(size=10) # 连接池
self.thread_pool = ThreadPoolExecutor(max_workers=4) # 线程池
def execute(self, method_name, parameters):
"""优化执行"""
# 1. 检查缓存
cache_key = self.get_cache_key(method_name, parameters)
cached_result = self.cache.get(cache_key)
if cached_result:
return cached_result
# 2. 使用连接池
with self.connection_pool.get_connection() as conn:
# 3. 异步执行
future = self.thread_pool.submit(
self._execute_method,
method_name,
parameters,
conn
)
result = future.result(timeout=10)
# 4. 缓存结果
self.cache.set(cache_key, result)
return result
def _execute_method(self, method_name, parameters, conn):
"""执行方法"""
method = getattr(self, method_name)
return method(**parameters, connection=conn)
### 2. 并发性能
#### Skills 并发性能
Skills 的并发性能受限于 LLM API 的并发限制。
class SkillConcurrencyManager:
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.request_queue = asyncio.Queue()
async def execute_skill(self, skill, user_input):
"""异步执行技能"""
async with self.semaphore:
return await skill.execute_async(user_input)
async def batch_execute(self, skill, user_inputs):
"""批量执行"""
tasks = [
self.execute_skill(skill, user_input)
for user_input in user_inputs
]
return await asyncio.gather(*tasks)
# 性能指标
# 单个请求:1000-1800ms
# 并发 5 个:1200-2000ms(每个)
# 并发 10 个:1500-2500ms(每个)
# 并发 20 个:2000-3500ms(每个)
bashpython
#### 插件并发性能
插件的并发性能主要取决于系统资源和代码设计。
```python
class PluginConcurrencyManager:
def __init__(self, max_workers=10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.async_executor = ProcessPoolExecutor(max_workers=max_workers)
def execute_plugin(self, plugin, method_name, parameters):
"""同步执行"""
future = self.executor.submit(
self._execute_sync,
plugin,
method_name,
parameters
)
return future.result(timeout=30)
async def execute_plugin_async(self, plugin, method_name, parameters):
"""异步执行"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
self._execute_sync,
plugin,
method_name,
parameters
)
def batch_execute(self, plugin, method_name, parameters_list):
"""批量执行"""
futures = [
self.executor.submit(
self._execute_sync,
plugin,
method_name,
parameters
)
for parameters in parameters_list
]
return [future.result() for future in futures]
# 性能指标
# 单个请求:30-125ms
# 并发 10 个:40-150ms(每个)
# 并发 50 个:60-200ms(每个)
# 并发 100 个:100-300ms(每个)
### 3\. 资源消耗
#### Skills 资源消耗
class SkillResourceMonitor: def **init**(self): self.metrics = { "cpu_usage": [], # CPU 使用率 "memory_usage": [], # 内存使用 "network_io": [], # 网络 I/O "api_calls": 0 # API 调用次数 } def monitor_execution(self, skill, user_input): """监控执行过程""" import psutil
# 记录初始状态
process = psutil.Process() initial_cpu = process.cpu_percent() initial_memory = process.memory_info().rss
# 执行技能
result = skill.execute(user_input)
# 记录最终状态
final_cpu = process.cpu_percent() final_memory = process.memory_info().rss
# 计算资源消耗
self.metrics["cpu_usage"].append(final_cpu - initial_cpu) self.metrics["memory_usage"].append(final_memory - initial_memory) self.metrics["api_calls"] += 1 return result
# 典型资源消耗
# CPU 使用率:10-30%
# 内存使用:100-500MB
# 网络 I/O:1-5MB/请求
# API 调用:2-4 次/请求
bash#### 插件资源消耗
```python
python
class PluginResourceMonitor:
def __init__(self):
self.metrics = {
"cpu_usage": [], # CPU 使用率
"memory_usage": [], # 内存使用
"disk_io": [], # 磁盘 I/O
"network_io": [] # 网络 I/O
}
def monitor_execution(self, plugin, method_name, parameters):
"""监控执行过程"""
import psutil
# 记录初始状态
process = psutil.Process()
initial_cpu = process.cpu_percent()
initial_memory = process.memory_info().rss
initial_disk_io = process.io_counters()
initial_network_io = psutil.net_io_counters()
# 执行插件方法
result = getattr(plugin, method_name)(**parameters)
# 记录最终状态
final_cpu = process.cpu_percent()
final_memory = process.memory_info().rss
final_disk_io = process.io_counters()
final_network_io = psutil.net_io_counters()
# 计算资源消耗
self.metrics["cpu_usage"].append(final_cpu - initial_cpu)
self.metrics["memory_usage"].append(final_memory - initial_memory)
self.metrics["disk_io"].append(
final_disk_io.write_bytes - initial_disk_io.write_bytes
)
self.metrics["network_io"].append(
final_network_io.bytes_sent - initial_network_io.bytes_sent
)
return result
# 典型资源消耗
# CPU 使用率:5-15%
# 内存使用:50-200MB
# 磁盘 I/O:100KB-10MB/请求
# 网络 I/O:100KB-1MB/请求
### 4\. 性能对比表性能指标| Skills| 插件
---|---|---
单次执行时间| 1000-1800ms| 30-125ms
并发能力| 低(5-10)| 高(50-100+)
CPU 使用率| 10-30%| 5-15%
内存使用| 100-500MB| 50-200MB
网络 I/O| 1-5MB/请求| 100KB-1MB/请求
可扩展性| 中| 高
吞吐量| 低| 高
延迟| 高| 低
## 19.4.2 安全对比
### 1\. 输入安全
#### Skills 输入安全
Skills 需要防范提示注入攻击和敏感信息泄露。
class SkillInputSecurity: def **init**(self): self.injection_patterns = [ r"ignore.*instructions", r"forget.*previous", r"new.*instructions", r"override.*system" ] self.sensitive_keywords = [ "password", "token", "api_key", "secret", "credential" ] def sanitize_input(self, user_input): """清理输入""" import re
# 1\. 检测注入攻击
for pattern in self.injection_patterns: if re.search(pattern, user_input, re.IGNORECASE): raise SecurityError("检测到潜在的注入攻击")
# 2\. 移除敏感信息
sanitized = user_input for keyword in self.sensitive_keywords:
# 使用占位符替换敏感信息
pattern = rf"{keyword}\s*[:=]\s*[^\s]+" sanitized = re.sub(pattern, f"{keyword}=_**REDACTED**_ ", sanitized, flags=re.IGNORECASE)
# 3\. 限制输入长度
max_length = 10000 if len(sanitized) > max_length: sanitized = sanitized[:max_length] return sanitized def validate_input(self, user_input): """验证输入"""
# 检查恶意代码
if self.contains_malicious_code(user_input): raise SecurityError("输入包含恶意代码")
# 检查特殊字符
if self.contains_dangerous_characters(user_input): raise SecurityError("输入包含危险字符") return True
bash
#### 插件输入安全
插件通过类型系统和验证规则确保输入安全。
```python
```python
from pydantic import BaseModel, validator, constr
from typing import Optional
class SecureInput(BaseModel):
"""安全输入模型"""
username: constr(min_length=3, max_length=50)
email: str
message: constr(max_length=1000)
@validator('username')
def username_alphanumeric(cls, v):
if not v.isalnum():
raise ValueError('用户名只能包含字母和数字')
return v
@validator('email')
def email_valid(cls, v):
import re
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', v):
raise ValueError('邮箱格式无效')
return v
@validator('message')
def message_safe(cls, v):
# 检查 XSS 攻击
dangerous_patterns = ['<script>', 'javascript:', 'onerror=']
for pattern in dangerous_patterns:
if pattern.lower() in v.lower():
raise ValueError('消息包含潜在危险内容')
return v
class PluginInputSecurity:
def __init__(self):
self.input_model = SecureInput
def validate_input(self, parameters):
"""验证输入"""
try:
validated = self.input_model(**parameters)
return validated.dict()
except Exception as e:
raise SecurityError(f"输入验证失败: {str(e)}")
### 2. 输出安全
#### Skills 输出安全
Skills 需要防止输出敏感信息和有害内容。
class SkillOutputSecurity:
def __init__(self):
self.sensitive_patterns = [
r"password\s*[:=]\s*\S+",
r"token\s*[:=]\s*\S+",
r"api[_-]?key\s*[:=]\s*\S+"
]
self.harmful_categories = [
"violence", "hate", "sexual",
"self-harm", "illegal"
]
def sanitize_output(self, output):
"""清理输出"""
import re
# 1. 移除敏感信息
sanitized = output
for pattern in self.sensitive_patterns:
sanitized = re.sub(pattern, "***REDACTED***", sanitized, flags=re.IGNORECASE)
# 2. 检查有害内容
if self.contains_harmful_content(sanitized):
sanitized = self.add_warning(sanitized, "内容可能包含有害信息")
# 3. 限制输出长度
max_length = 50000
if len(sanitized) > max_length:
sanitized = sanitized[:max_length] + "\n\n[输出已截断]"
return sanitized
def contains_harmful_content(self, text):
"""检查有害内容"""
# 使用内容审核 API
# 这里简化实现
harmful_keywords = ["暴力", "仇恨", "非法"] for keyword in harmful_keywords: if keyword in text: return True return False def add_warning(self, text, warning): """添加警告""" return f"[警告: {warning}]\n\n{text}"
bash
#### 插件输出安全
插件通过数据过滤和编码确保输出安全。
```python
```python
class PluginOutputSecurity:
def __init__(self):
self.html_escape_map = {
'&': '&',
'<': '<',
'>': '>',
'"': '"',
"'": '''
}
def sanitize_output(self, output):
"""清理输出"""
# 1. HTML 转义
if isinstance(output, str):
output = self.html_escape(output)
# 2. JSON 编码
if isinstance(output, dict):
output = self.json_encode(output)
# 3. 移除敏感字段
if isinstance(output, dict):
output = self.remove_sensitive_fields(output)
return output
def html_escape(self, text):
"""HTML 转义"""
result = []
for char in text:
result.append(self.html_escape_map.get(char, char))
return ''.join(result)
def json_encode(self, data):
"""JSON 编码"""
import json
return json.dumps(data, ensure_ascii=False)
def remove_sensitive_fields(self, data):
"""移除敏感字段"""
sensitive_fields = ['password', 'token', 'secret']
return {
k: v for k, v in data.items()
if k not in sensitive_fields
}
### 3. 访问控制
#### Skills 访问控制
Skills 的访问控制相对宽松,主要依赖 LLM 的理解能力。
class SkillAccessControl:
def __init__(self):
self.permissions = {
"admin": ["all"],
"user": ["read", "write"],
"guest": ["read"]
}
def check_permission(self, user, skill_name, action):
"""检查权限"""
user_role = user.get("role", "guest")
user_permissions = self.permissions.get(user_role, [])
if "all" in user_permissions:
return True
if action in user_permissions:
return True
raise PermissionError(f"用户 {user['username']} 无权限执行 {action}")
def filter_sensitive_data(self, user, data):
"""过滤敏感数据"""
user_role = user.get("role", "guest")
if user_role == "admin":
return data
# 移除敏感字段
filtered = data.copy()
sensitive_fields = ["internal_notes", "admin_only"]
for field in sensitive_fields:
filtered.pop(field, None)
return filtered
#### 插件访问控制插件可以实现细粒度的访问控制。
python
from functools import wraps
class PluginAccessControl:
def __init__(self):
self.roles = {
"admin": {"priority": 100},
"user": {"priority": 50},
"guest": {"priority": 10}
}
self.permissions = {}
def require_permission(self, permission):
"""权限装饰器"""
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
user = kwargs.get('user', None)
if not user:
raise PermissionError("需要用户认证")
if not self.has_permission(user, permission):
raise PermissionError(f"缺少权限: {permission}")
return func(self, *args, **kwargs)
return wrapper
return decorator
def has_permission(self, user, permission):
"""检查权限"""
user_role = user.get("role", "guest")
role_info = self.roles.get(user_role, {})
user_permissions = self.permissions.get(user_role, set())
return permission in user_permissions or "all" in user_permissions
def add_permission(self, role, permission):
"""添加权限"""
if role not in self.permissions:
self.permissions[role] = set()
self.permissions[role].add(permission)
# 使用示例
class SecurePlugin(Plugin):
def __init__(self):
super().__init__()
self.access_control = PluginAccessControl()
@PluginAccessControl.require_permission("read")
def read_data(self, user, data_id):
"""读取数据"""
return self._read_data(data_id)
@PluginAccessControl.require_permission("write")
def write_data(self, user, data_id, data):
"""写入数据"""
return self._write_data(data_id, data)
### 4. 审计日志
#### Skills 审计日志
class SkillAuditLogger:
def __init__(self):
self.logs = []
def log_execution(self, skill_name, user_input, result, user):
"""记录执行"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"skill_name": skill_name,
"user": user.get("username", "anonymous"),
"input": self.sanitize_input(user_input),
"output": self.sanitize_output(result),
"success": result.get("success", False),
"duration": result.get("duration", 0)
}
self.logs.append(log_entry)
def get_logs(self, user=None, skill_name=None, start_time=None, end_time=None):
"""获取日志"""
filtered_logs = self.logs
if user:
filtered_logs = [log for log in filtered_logs if log["user"] == user]
if skill_name:
filtered_logs = [log for log in filtered_logs if log["skill_name"] == skill_name]
if start_time:
filtered_logs = [log for log in filtered_logs if log["timestamp"] >= start_time]
if end_time:
filtered_logs = [log for log in filtered_logs if log["timestamp"] <= end_time]
return filtered_logs
#### 插件审计日志
python
class PluginAuditLogger:
def __init__(self):
self.logs = []
self.max_logs = 10000
def log_execution(self, plugin_name, method_name, parameters, result, user):
"""记录执行"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"plugin_name": plugin_name,
"method_name": method_name,
"user": user.get("username", "anonymous"),
"parameters": self.sanitize_parameters(parameters),
"result": self.sanitize_result(result),
"success": result.get("success", False),
"duration": result.get("duration", 0),
"ip_address": user.get("ip_address", "unknown")
}
self.logs.append(log_entry)
# 限制日志数量
if len(self.logs) > self.max_logs:
self.logs = self.logs[-self.max_logs:]
def export_logs(self, format="json"):
"""导出日志"""
if format == "json":
import json
return json.dumps(self.logs, indent=2)
elif format == "csv":
import csv
import io
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=self.logs[0].keys())
writer.writeheader()
writer.writerows(self.logs)
return output.getvalue()
### 5\. 安全对比表
安全特性| Skills| 插件
---|---|---
输入验证| 弱| 强
输出过滤| 中| 强
访问控制| 弱| 强
审计日志| 中| 强
类型安全| 无| 强
注入防护| 弱| 强
敏感信息保护| 中| 强
代码审查| 困难| 容易
安全测试| 困难| 容易
合规性| 中| 强