Skip to content

16.4 Skills 权限控制

权限控制概述

Skills 的权限控制是确保系统安全和数据保护的重要机制。本节将详细介绍 Skills 的权限模型、权限类型和权限管理方法。

权限模型

1. 基于角色的访问控制(RBAC)

1.1 角色定义

角色类型

管理员角色

  • 完全访问权限
  • 可以创建、修改、删除 Skills
  • 可以管理用户和角色

开发者角色

  • 可以创建和修改 Skills
  • 可以执行 Skills
  • 不能删除其他用户的 Skills

用户角色

  • 只能执行 Skills
  • 不能创建或修改 Skills
  • 访问受限的资源和功能

访客角色

  • 只读权限
  • 只能查看 Skill 文档
  • 不能执行任何操作
python
#### 1.2 权限定义

    python


    class Permission:
        def __init__(self, name, description):
            self.name = name
            self.description = description

    # Skill 权限
    SKILL_CREATE = Permission("skill:create", "创建 Skill")
    SKILL_READ = Permission("skill:read", "读取 Skill")
    SKILL_UPDATE = Permission("skill:update", "更新 Skill")
    SKILL_DELETE = Permission("skill:delete", "删除 Skill")
    SKILL_EXECUTE = Permission("skill:execute", "执行 Skill")

    # 资源权限
    RESOURCE_READ = Permission("resource:read", "读取资源")
    RESOURCE_WRITE = Permission("resource:write", "写入资源")
    RESOURCE_DELETE = Permission("resource:delete", "删除资源")

    # 系统权限
    SYSTEM_CONFIG = Permission("system:config", "配置系统")
    SYSTEM_MONITOR = Permission("system:monitor", "监控系统")

    #### 1.3 角色权限映射

    ```python

    class Role:
        def __init__(self, name, permissions):
            self.name = name
            self.permissions = permissions

    # 定义角色

```python
        ADMIN = Role("admin", [
            SKILL_CREATE, SKILL_READ, SKILL_UPDATE, SKILL_DELETE, SKILL_EXECUTE,
            RESOURCE_READ, RESOURCE_WRITE, RESOURCE_DELETE,
            SYSTEM_CONFIG, SYSTEM_MONITOR
        ])
        DEVELOPER = Role("developer", [
            SKILL_CREATE, SKILL_READ, SKILL_UPDATE, SKILL_EXECUTE,
            RESOURCE_READ, RESOURCE_WRITE
        ])
        USER = Role("user", [
            SKILL_READ, SKILL_EXECUTE,
            RESOURCE_READ
        ])
        GUEST = Role("guest", [
            SKILL_READ
        ])

    ~~~
    ### 2. 基于属性的访问控制(ABAC)
    #### 2.1 属性定义

    ```python

    class Attribute:
        def __init__(self, name, value):
            self.name = name
            self.value = value

    # 用户属性

    class UserAttributes:
        def __init__(self, user_id, role, department, level):
            self.user_id = Attribute("user_id", user_id)
            self.role = Attribute("role", role)
            self.department = Attribute("department", department)
            self.level = Attribute("level", level)

    # 资源属性

    class ResourceAttributes:
        def __init__(self, resource_id, owner, sensitivity, classification):
            self.resource_id = Attribute("resource_id", resource_id)
            self.owner = Attribute("owner", owner)
            self.sensitivity = Attribute("sensitivity", sensitivity)
            self.classification = Attribute("classification", classification)

    # 环境属性

    class EnvironmentAttributes:
        def __init__(self, time, location, device):
            self.time = Attribute("time", time)
            self.location = Attribute("location", location)
            self.device = Attribute("device", device)

    #### 2.2 策略定义

    ```python
    class AccessPolicy:
        def __init__(self, name, condition, effect):
            self.name = name
            self.condition = condition
            self.effect = effect
    # 示例策略
        POLICIES = [
            AccessPolicy(
                name="admin_full_access",
                condition=lambda user, resource, env: user.role.value == "admin",
                effect="allow"
            ),
            AccessPolicy(
                name="owner_access",
                condition=lambda user, resource, env: user.user_id.value == resource.owner.value,
                effect="allow"
            ),
            AccessPolicy(
                name="sensitivity_restriction",
                condition=lambda user, resource, env: resource.sensitivity.value == "high" and user.level.value < 3,
                effect="deny"
            ),
            AccessPolicy(
                name="business_hours_only",
                condition=lambda user, resource, env: not (9 <= env.time.value.hour < 17),
                effect="deny"
            )
        ]

    ~~~
    ### 3. 权限检查
    #### 3.1 RBAC 检查

    ```python

    class RBACChecker:
        def __init__(self):
            self.user_roles = {}
            self.role_permissions = {}

        def assign_role(self, user_id, role):
            if user_id not in self.user_roles:
                self.user_roles[user_id] = []
            self.user_roles[user_id].append(role)

        def check_permission(self, user_id, permission):

            # 获取用户角色

            roles = self.user_roles.get(user_id, [])

            # 检查每个角色是否有该权限

            for role in roles:
                if permission in role.permissions:
                    return True

            return False

        def check_skill_permission(self, user_id, skill_id, action):
            permission = Permission(f"skill:{action}", "")
            return self.check_permission(user_id, permission)

    #### 3.2 ABAC 检查

    class ABACChecker:
        def __init__(self):
            self.policies = []

        def add_policy(self, policy):
            self.policies.append(policy)

        def check_access(self, user, resource, environment):
            for policy in self.policies:
                if policy.condition(user, resource, environment):
                    return policy.effect == "allow"
            return False

        def check_skill_access(self, user, skill, environment):
            return self.check_access(user, skill, environment)

    ~~~

    ## 资源访问控制

    ### 1. 文件系统访问

    #### 1.1 文件权限

    ```python
    class FilePermission:
        def __init__(self, path, read, write, execute):
            self.path = path
            self.read = read
            self.write = write
            self.execute = execute

    class FileAccessController:
        def __init__(self):
            self.permissions = {}

        def set_permission(self, path, read=False, write=False, execute=False):
            self.permissions[path] = FilePermission(path, read, write, execute)

        def check_read(self, user_id, path):
            permission = self.permissions.get(path)
            if not permission:
                return False

            # 检查用户是否有读权限
            if permission.read:
                return True

            # 检查用户是否是文件所有者
            if self.is_owner(user_id, path):
                return True

            return False

        def check_write(self, user_id, path):
            permission = self.permissions.get(path)
            if not permission:
                return False

            if permission.write:
                return True

            if self.is_owner(user_id, path):
                return True

            return False

        def is_owner(self, user_id, path):
            # 检查用户是否是文件所有者
            pass

#### 1.2 目录访问

    python
python
    class DirectoryAccessController:
        def __init__(self):
            self.access_rules = {}

        def set_access_rule(self, directory, allowed_users, allowed_roles):
            self.access_rules[directory] = {
                "users": allowed_users,
                "roles": allowed_roles
            }

        def check_access(self, user_id, user_roles, directory):
            rule = self.access_rules.get(directory)
            if not rule:
                return False
            if user_id in rule["users"]:
                return True
            for role in user_roles:
                if role in rule["roles"]:
                    return True
            return False

### 2\. API 访问控制

#### 2.1 API 权限

    python
python
    class APIPermission:
        def __init__(self, endpoint, method, required_permissions):
            self.endpoint = endpoint
            self.method = method
            self.required_permissions = required_permissions

    class APIAccessController:
        def __init__(self):
            self.api_permissions = {}

        def register_endpoint(self, endpoint, method, required_permissions):
            key = f"{method}:{endpoint}"
            self.api_permissions[key] = APIPermission(
                endpoint, method, required_permissions
            )

        def check_access(self, user_id, endpoint, method):
            key = f"{method}:{endpoint}"
            permission = self.api_permissions.get(key)

            if not permission:
                return False

            # 检查用户是否有所有必需的权限
            for required_perm in permission.required_permissions:
                if not self.has_permission(user_id, required_perm):
                    return False

            return True

        def has_permission(self, user_id, permission):
            # 检查用户是否有特定权限
            pass

#### 2.2 速率限制

    python
python
    class RateLimiter:
        def __init__(self):
            self.requests = defaultdict(list)

        def check_rate_limit(self, user_id, endpoint, limit, window):
            now = datetime.now()
            window_start = now - timedelta(seconds=window)
            user_requests = self.requests[user_id]
            recent_requests = [
                req for req in user_requests
                if req["timestamp"] >= window_start and req["endpoint"] == endpoint
            ]
            if len(recent_requests) >= limit:
                return False
            user_requests.append({
                "endpoint": endpoint,
                "timestamp": now
            })
            return True

### 3\. 数据访问控制

#### 3.1 数据库访问

    python
python
    class DatabaseAccessController:
        def __init__(self):
            self.table_permissions = {}

        def set_table_permission(self, table, permissions):
            self.table_permissions[table] = permissions

        def check_select(self, user_id, table):
            permission = self.table_permissions.get(table)
            if not permission:
                return False

            return permission.get("select", False)

        def check_insert(self, user_id, table):
            permission = self.table_permissions.get(table)
            if not permission:
                return False

            return permission.get("insert", False)

        def check_update(self, user_id, table):
            permission = self.table_permissions.get(table)
            if not permission:
                return False

            return permission.get("update", False)

        def check_delete(self, user_id, table):
            permission = self.table_permissions.get(table)
            if not permission:
                return False

            return permission.get("delete", False)

    #### 3.2 字段级访问

    class FieldAccessController:
        def __init__(self):
            self.field_permissions = {}

        def set_field_permission(self, table, field, allowed_roles):
            key = f"{table}.{field}"
            self.field_permissions[key] = allowed_roles

        def check_field_access(self, user_roles, table, field):
            key = f"{table}.{field}"
            allowed_roles = self.field_permissions.get(key, [])
            for role in user_roles:
                if role in allowed_roles:
                    return True
            return False

        def filter_fields(self, user_roles, table, fields):
            accessible_fields = []
            for field in fields:
                if self.check_field_access(user_roles, table, field):
                    accessible_fields.append(field)
            return accessible_fields

## Skill 权限管理

### 1\. Skill 创建权限

    bash
python
    python

    class SkillCreationManager:
        def __init__(self, permission_checker):
            self.permission_checker = permission_checker

        def can_create_skill(self, user_id, skill_definition):
            # 检查用户是否有创建 Skill 的权限
            if not self.permission_checker.check_permission(user_id, SKILL_CREATE):
                return False

            # 检查 Skill 定义是否符合规范
            if not self.validate_skill_definition(skill_definition):
                return False

            # 检查资源访问权限
            if not self.check_resource_access(user_id, skill_definition):
                return False

            return True

        def validate_skill_definition(self, definition):
            # 验证 Skill 定义
            required_fields = ["name", "version", "description"]
            for field in required_fields:
                if field not in definition:
                    return False
            return True

        def check_resource_access(self, user_id, definition):
            # 检查 Skill 需要访问的资源
            resources = definition.get("resources", [])
            for resource in resources:
                if not self.permission_checker.check_resource_access(user_id, resource):
                    return False
            return True

    ```

    ### 2. Skill 执行权限

    ```python
    class SkillExecutionManager:
        def __init__(self, permission_checker):
            self.permission_checker = permission_checker

        def can_execute_skill(self, user_id, skill_id, parameters):
            if not self.permission_checker.check_permission(user_id, SKILL_EXECUTE):
                return False
            skill = self.get_skill(skill_id)
            if not skill:
                return False
            if not self.check_skill_specific_permission(user_id, skill):
                return False
            if not self.check_parameter_permissions(user_id, skill, parameters):
                return False
            return True

        def check_skill_specific_permission(self, user_id, skill):
            required_permissions = skill.get_required_permissions()
            for permission in required_permissions:
                if not self.permission_checker.check_permission(user_id, permission):
                    return False
            return True

        def check_parameter_permissions(self, user_id, skill, parameters):
            for key, value in parameters.items():
                if self.is_resource_reference(value):
                    if not self.permission_checker.check_resource_access(user_id, value):
                        return False
            return True

        def is_resource_reference(self, value):
            return isinstance(value, str) and value.startswith("resource://")

### 3\. Skill 修改权限

    bash
python
    python

    class SkillModificationManager:
        def __init__(self, permission_checker):
            self.permission_checker = permission_checker

        def can_modify_skill(self, user_id, skill_id):
            # 检查用户是否有修改 Skill 的权限
            if not self.permission_checker.check_permission(user_id, SKILL_UPDATE):
                return False

            # 检查 Skill 是否存在
            skill = self.get_skill(skill_id)
            if not skill:
                return False

            # 检查用户是否是 Skill 所有者
            if not self.is_skill_owner(user_id, skill):
                return False

            return True

        def is_skill_owner(self, user_id, skill):
            return skill.owner_id == user_id

    ```

    ### 4. Skill 删除权限

    ```python
    class SkillDeletionManager:
        def __init__(self, permission_checker):
            self.permission_checker = permission_checker

        def can_delete_skill(self, user_id, skill_id):
            if not self.permission_checker.check_permission(user_id, SKILL_DELETE):
                return False
            skill = self.get_skill(skill_id)
            if not skill:
                return False
            if not (self.is_skill_owner(user_id, skill) or
                    self.permission_checker.is_admin(user_id)):
                return False
            if self.has_dependents(skill_id):
                return False
            return True

        def is_skill_owner(self, user_id, skill):
            return skill.owner_id == user_id

        def has_dependents(self, skill_id):
            pass

    ```

    ## 审计日志

    ### 1. 访问日志

    ```python
    class AccessLogger:
        def __init__(self):
            self.logs = []

        def log_access(self, user_id, resource, action, result):
            log_entry = {
                "timestamp": datetime.now(),
                "user_id": user_id,
                "resource": resource,
                "action": action,
                "result": result
            }
            self.logs.append(log_entry)

        def get_access_logs(self, user_id=None, resource=None, action=None):
            filtered_logs = self.logs

            if user_id:
                filtered_logs = [log for log in filtered_logs if log["user_id"] == user_id]

            if resource:
                filtered_logs = [log for log in filtered_logs if log["resource"] == resource]

            if action:
                filtered_logs = [log for log in filtered_logs if log["action"] == action]

            return filtered_logs

    ```

    ### 2. 权限变更日志

    ```python
    class PermissionChangeLogger:
        def __init__(self):
            self.changes = []

        def log_permission_change(self, user_id, target, old_permissions, new_permissions, changed_by):
            change_entry = {
                "timestamp": datetime.now(),
                "user_id": user_id,
                "target": target,
                "old_permissions": old_permissions,
                "new_permissions": new_permissions,
                "changed_by": changed_by
            }
            self.changes.append(change_entry)

        def get_permission_changes(self, user_id=None, target=None):
            filtered_changes = self.changes
            if user_id:
                filtered_changes = [change for change in filtered_changes if change["user_id"] == user_id]
            if target:
                filtered_changes = [change for change in filtered_changes if change["target"] == target]
            return filtered_changes

    ```

基于 MIT 许可发布 | 永久导航