16.4 Skills 許可權控制
許可權控制概述
Skills 的許可權控制是確保系統安全和資料保護的重要機制。本節將詳細介紹 Skills 的許可權模型、許可權型別和許可權管理方法。
权限模型
1. 基于角色的访问控制(RBAC)
1.1 角色定义
角色类型
管理员角色
- 完全訪問許可權
- 可以建立、修改、刪除 Skills
- 可以管理使用者和角色
開發者角色
- 可以建立和修改 Skills
- 可以執行 Skills
- 不能刪除其他使用者的 Skills
使用者角色
- 只能執行 Skills
- 不能建立或修改 Skills
- 訪問受限的資源和功能
訪客角色
- 只讀許可權
- 只能檢視 Skill 文件
- 不能執行任何操作
python
#### 1.2 权限定义
python
class Permission:
def __init__(self, name, description):
self.name = name
self.description = description
# Skill 权限
SKILL_CREATE = Permission("skill:create", "创建 Skill")
SKILL_READ = Permission("skill:read", "读取 Skill")
SKILL_UPDATE = Permission("skill:update", "更新 Skill")
SKILL_DELETE = Permission("skill:delete", "删除 Skill")
SKILL_EXECUTE = Permission("skill:execute", "执行 Skill")
# 资源权限
RESOURCE_READ = Permission("resource:read", "读取资源")
RESOURCE_WRITE = Permission("resource:write", "写入资源")
RESOURCE_DELETE = Permission("resource:delete", "删除资源")
# 系统权限
SYSTEM_CONFIG = Permission("system:config", "配置系统")
SYSTEM_MONITOR = Permission("system:monitor", "监控系统")
#### 1.3 角色权限映射
```python
class Role:
def __init__(self, name, permissions):
self.name = name
self.permissions = permissions
# 定義角色
```python
ADMIN = Role("admin", [
SKILL_CREATE, SKILL_READ, SKILL_UPDATE, SKILL_DELETE, SKILL_EXECUTE,
RESOURCE_READ, RESOURCE_WRITE, RESOURCE_DELETE,
SYSTEM_CONFIG, SYSTEM_MONITOR
])
DEVELOPER = Role("developer", [
SKILL_CREATE, SKILL_READ, SKILL_UPDATE, SKILL_EXECUTE,
RESOURCE_READ, RESOURCE_WRITE
])
USER = Role("user", [
SKILL_READ, SKILL_EXECUTE,
RESOURCE_READ
])
GUEST = Role("guest", [
SKILL_READ
])
~~~
### 2. 基于属性的访问控制(ABAC)
#### 2.1 属性定义
```python
class Attribute:
def __init__(self, name, value):
self.name = name
self.value = value
# 使用者屬性
class UserAttributes:
def __init__(self, user_id, role, department, level):
self.user_id = Attribute("user_id", user_id)
self.role = Attribute("role", role)
self.department = Attribute("department", department)
self.level = Attribute("level", level)
# 資源屬性
class ResourceAttributes:
def __init__(self, resource_id, owner, sensitivity, classification):
self.resource_id = Attribute("resource_id", resource_id)
self.owner = Attribute("owner", owner)
self.sensitivity = Attribute("sensitivity", sensitivity)
self.classification = Attribute("classification", classification)
# 環境屬性
class EnvironmentAttributes:
def __init__(self, time, location, device):
self.time = Attribute("time", time)
self.location = Attribute("location", location)
self.device = Attribute("device", device)
#### 2.2 策略定義
```python
class AccessPolicy:
def __init__(self, name, condition, effect):
self.name = name
self.condition = condition
self.effect = effect
# 示例策略
POLICIES = [
AccessPolicy(
name="admin_full_access",
condition=lambda user, resource, env: user.role.value == "admin",
effect="allow"
),
AccessPolicy(
name="owner_access",
condition=lambda user, resource, env: user.user_id.value == resource.owner.value,
effect="allow"
),
AccessPolicy(
name="sensitivity_restriction",
condition=lambda user, resource, env: resource.sensitivity.value == "high" and user.level.value < 3,
effect="deny"
),
AccessPolicy(
name="business_hours_only",
condition=lambda user, resource, env: not (9 <= env.time.value.hour < 17),
effect="deny"
)
]
~~~
### 3. 权限检查
#### 3.1 RBAC 检查
```python
class RBACChecker:
def __init__(self):
self.user_roles = {}
self.role_permissions = {}
def assign_role(self, user_id, role):
if user_id not in self.user_roles:
self.user_roles[user_id] = []
self.user_roles[user_id].append(role)
def check_permission(self, user_id, permission):
# 獲取使用者角色
roles = self.user_roles.get(user_id, [])
# 檢查每個角色是否有該許可權
for role in roles:
if permission in role.permissions:
return True
return False
def check_skill_permission(self, user_id, skill_id, action):
permission = Permission(f"skill:{action}", "")
return self.check_permission(user_id, permission)
#### 3.2 ABAC 檢查
class ABACChecker:
def __init__(self):
self.policies = []
def add_policy(self, policy):
self.policies.append(policy)
def check_access(self, user, resource, environment):
for policy in self.policies:
if policy.condition(user, resource, environment):
return policy.effect == "allow"
return False
def check_skill_access(self, user, skill, environment):
return self.check_access(user, skill, environment)
~~~
## 資源訪問控制
### 1. 檔案系統訪問
#### 1.1 檔案許可權
```python
class FilePermission:
def __init__(self, path, read, write, execute):
self.path = path
self.read = read
self.write = write
self.execute = execute
class FileAccessController:
def __init__(self):
self.permissions = {}
def set_permission(self, path, read=False, write=False, execute=False):
self.permissions[path] = FilePermission(path, read, write, execute)
def check_read(self, user_id, path):
permission = self.permissions.get(path)
if not permission:
return False
# 检查用户是否有读权限
if permission.read:
return True
# 检查用户是否是文件所有者
if self.is_owner(user_id, path):
return True
return False
def check_write(self, user_id, path):
permission = self.permissions.get(path)
if not permission:
return False
if permission.write:
return True
if self.is_owner(user_id, path):
return True
return False
def is_owner(self, user_id, path):
# 检查用户是否是文件所有者
pass
#### 1.2 目录访问
pythonpython
class DirectoryAccessController:
def __init__(self):
self.access_rules = {}
def set_access_rule(self, directory, allowed_users, allowed_roles):
self.access_rules[directory] = {
"users": allowed_users,
"roles": allowed_roles
}
def check_access(self, user_id, user_roles, directory):
rule = self.access_rules.get(directory)
if not rule:
return False
if user_id in rule["users"]:
return True
for role in user_roles:
if role in rule["roles"]:
return True
return False
### 2\. API 访问控制
#### 2.1 API 权限
pythonpython
class APIPermission:
def __init__(self, endpoint, method, required_permissions):
self.endpoint = endpoint
self.method = method
self.required_permissions = required_permissions
class APIAccessController:
def __init__(self):
self.api_permissions = {}
def register_endpoint(self, endpoint, method, required_permissions):
key = f"{method}:{endpoint}"
self.api_permissions[key] = APIPermission(
endpoint, method, required_permissions
)
def check_access(self, user_id, endpoint, method):
key = f"{method}:{endpoint}"
permission = self.api_permissions.get(key)
if not permission:
return False
# 检查用户是否有所有必需的权限
for required_perm in permission.required_permissions:
if not self.has_permission(user_id, required_perm):
return False
return True
def has_permission(self, user_id, permission):
# 检查用户是否有特定权限
pass
#### 2.2 速率限制
pythonpython
class RateLimiter:
def __init__(self):
self.requests = defaultdict(list)
def check_rate_limit(self, user_id, endpoint, limit, window):
now = datetime.now()
window_start = now - timedelta(seconds=window)
user_requests = self.requests[user_id]
recent_requests = [
req for req in user_requests
if req["timestamp"] >= window_start and req["endpoint"] == endpoint
]
if len(recent_requests) >= limit:
return False
user_requests.append({
"endpoint": endpoint,
"timestamp": now
})
return True
### 3\. 数据访问控制
#### 3.1 数据库访问
pythonpython
class DatabaseAccessController:
def __init__(self):
self.table_permissions = {}
def set_table_permission(self, table, permissions):
self.table_permissions[table] = permissions
def check_select(self, user_id, table):
permission = self.table_permissions.get(table)
if not permission:
return False
return permission.get("select", False)
def check_insert(self, user_id, table):
permission = self.table_permissions.get(table)
if not permission:
return False
return permission.get("insert", False)
def check_update(self, user_id, table):
permission = self.table_permissions.get(table)
if not permission:
return False
return permission.get("update", False)
def check_delete(self, user_id, table):
permission = self.table_permissions.get(table)
if not permission:
return False
return permission.get("delete", False)
#### 3.2 字段级访问
class FieldAccessController:
def __init__(self):
self.field_permissions = {}
def set_field_permission(self, table, field, allowed_roles):
key = f"{table}.{field}"
self.field_permissions[key] = allowed_roles
def check_field_access(self, user_roles, table, field):
key = f"{table}.{field}"
allowed_roles = self.field_permissions.get(key, [])
for role in user_roles:
if role in allowed_roles:
return True
return False
def filter_fields(self, user_roles, table, fields):
accessible_fields = []
for field in fields:
if self.check_field_access(user_roles, table, field):
accessible_fields.append(field)
return accessible_fields
## Skill 权限管理
### 1\. Skill 创建权限
bashpython
python
class SkillCreationManager:
def __init__(self, permission_checker):
self.permission_checker = permission_checker
def can_create_skill(self, user_id, skill_definition):
# 检查用户是否有创建 Skill 的权限
if not self.permission_checker.check_permission(user_id, SKILL_CREATE):
return False
# 检查 Skill 定义是否符合规范
if not self.validate_skill_definition(skill_definition):
return False
# 检查资源访问权限
if not self.check_resource_access(user_id, skill_definition):
return False
return True
def validate_skill_definition(self, definition):
# 验证 Skill 定义
required_fields = ["name", "version", "description"]
for field in required_fields:
if field not in definition:
return False
return True
def check_resource_access(self, user_id, definition):
# 检查 Skill 需要访问的资源
resources = definition.get("resources", [])
for resource in resources:
if not self.permission_checker.check_resource_access(user_id, resource):
return False
return True
```
### 2. Skill 執行許可權
```python
class SkillExecutionManager:
def __init__(self, permission_checker):
self.permission_checker = permission_checker
def can_execute_skill(self, user_id, skill_id, parameters):
if not self.permission_checker.check_permission(user_id, SKILL_EXECUTE):
return False
skill = self.get_skill(skill_id)
if not skill:
return False
if not self.check_skill_specific_permission(user_id, skill):
return False
if not self.check_parameter_permissions(user_id, skill, parameters):
return False
return True
def check_skill_specific_permission(self, user_id, skill):
required_permissions = skill.get_required_permissions()
for permission in required_permissions:
if not self.permission_checker.check_permission(user_id, permission):
return False
return True
def check_parameter_permissions(self, user_id, skill, parameters):
for key, value in parameters.items():
if self.is_resource_reference(value):
if not self.permission_checker.check_resource_access(user_id, value):
return False
return True
def is_resource_reference(self, value):
return isinstance(value, str) and value.startswith("resource://")
### 3\. Skill 修改权限
bashpython
python
class SkillModificationManager:
def __init__(self, permission_checker):
self.permission_checker = permission_checker
def can_modify_skill(self, user_id, skill_id):
# 检查用户是否有修改 Skill 的权限
if not self.permission_checker.check_permission(user_id, SKILL_UPDATE):
return False
# 检查 Skill 是否存在
skill = self.get_skill(skill_id)
if not skill:
return False
# 检查用户是否是 Skill 所有者
if not self.is_skill_owner(user_id, skill):
return False
return True
def is_skill_owner(self, user_id, skill):
return skill.owner_id == user_id
```
### 4. Skill 刪除許可權
```python
class SkillDeletionManager:
def __init__(self, permission_checker):
self.permission_checker = permission_checker
def can_delete_skill(self, user_id, skill_id):
if not self.permission_checker.check_permission(user_id, SKILL_DELETE):
return False
skill = self.get_skill(skill_id)
if not skill:
return False
if not (self.is_skill_owner(user_id, skill) or
self.permission_checker.is_admin(user_id)):
return False
if self.has_dependents(skill_id):
return False
return True
def is_skill_owner(self, user_id, skill):
return skill.owner_id == user_id
def has_dependents(self, skill_id):
pass
```
## 審計日誌
### 1. 訪問日誌
```python
class AccessLogger:
def __init__(self):
self.logs = []
def log_access(self, user_id, resource, action, result):
log_entry = {
"timestamp": datetime.now(),
"user_id": user_id,
"resource": resource,
"action": action,
"result": result
}
self.logs.append(log_entry)
def get_access_logs(self, user_id=None, resource=None, action=None):
filtered_logs = self.logs
if user_id:
filtered_logs = [log for log in filtered_logs if log["user_id"] == user_id]
if resource:
filtered_logs = [log for log in filtered_logs if log["resource"] == resource]
if action:
filtered_logs = [log for log in filtered_logs if log["action"] == action]
return filtered_logs
```
### 2. 許可權變更日誌
```python
class PermissionChangeLogger:
def __init__(self):
self.changes = []
def log_permission_change(self, user_id, target, old_permissions, new_permissions, changed_by):
change_entry = {
"timestamp": datetime.now(),
"user_id": user_id,
"target": target,
"old_permissions": old_permissions,
"new_permissions": new_permissions,
"changed_by": changed_by
}
self.changes.append(change_entry)
def get_permission_changes(self, user_id=None, target=None):
filtered_changes = self.changes
if user_id:
filtered_changes = [change for change in filtered_changes if change["user_id"] == user_id]
if target:
filtered_changes = [change for change in filtered_changes if change["target"] == target]
return filtered_changes
```