Skip to content

16.5 Skills 版本管理

版本管理概述

Skills 的版本管理是维护和演进 Skills 的重要机制。本节将详细介绍 Skills 的版本控制、版本发布和版本迁移等内容。

版本控制

1. 版本号规范

1.1 语义化版本

语义化版本(SemVer)

版本格式

主版本号.次版本号.修订号(MAJOR.MINOR.PATCH)

版本规则

  • 主版本号(MAJOR) :不兼容的 API 修改
  • 次版本号(MINOR) :向下兼容的功能性新增
  • 修订号(PATCH) :向下兼容的问题修正

示例

  • 1.0.0 → 1.1.0:新增功能(向下兼容)
  • 1.1.0 → 1.1.1:修复 bug(向下兼容)
  • 1.1.1 → 2.0.0:不兼容的 API 变更

1.2 预发布版本

格式

主版本号.次版本号.修订号-预发布标识.预发布版本号

预发布标识

  • alpha:内部测试版
  • beta:公开测试版
  • rc:候选发布版

示例

  • 1.0.0-alpha.1:第一个 alpha 版本
  • 1.0.0-beta.1:第一个 beta 版本
  • 1.0.0-rc.1:第一个候选版本

1.3 构建元数据

格式

主版本号.次版本号.修订号+构建元数据

构建元数据

  • 构建日期
  • 构建号
  • Git 提交哈希

示例

  • 1.0.0+20240115
  • 1.0.0+build.123
  • 1.0.0+abc123def456
python
### 2\. 版本比较

#### 2.1 版本比较算法

    python


    class VersionComparator:
        def compare(self, v1, v2):
            v1_parts = self.parse_version(v1)
            v2_parts = self.parse_version(v2)

            # 比较主版本号
            if v1_parts["major"] != v2_parts["major"]:
                return -1 if v1_parts["major"] < v2_parts["major"] else 1

            # 比较次版本号
            if v1_parts["minor"] != v2_parts["minor"]:
                return -1 if v1_parts["minor"] < v2_parts["minor"] else 1

            # 比较修订号
            if v1_parts["patch"] != v2_parts["patch"]:
                return -1 if v1_parts["patch"] < v2_parts["patch"] else 1

            # 比较预发布版本
            if v1_parts["prerelease"] and v2_parts["prerelease"]:
                return self.compare_prerelease(v1_parts["prerelease"], v2_parts["prerelease"])
            elif v1_parts["prerelease"]:
                return -1
            elif v2_parts["prerelease"]:
                return 1

            return 0

        def parse_version(self, version):
            # 解析版本号
            parts = version.split("-", 1)
            version_part = parts[0]
            prerelease_part = parts[1] if len(parts) > 1 else None

            version_numbers = version_part.split(".")
            major = int(version_numbers[0]) if len(version_numbers) > 0 else 0
            minor = int(version_numbers[1]) if len(version_numbers) > 1 else 0
            patch = int(version_numbers[2]) if len(version_numbers) > 2 else 0

            # 解析构建元数据
            build_metadata = None
            if "+" in version_part:
                version_part, build_metadata = version_part.split("+", 1)

            return {
                "major": major,
                "minor": minor,
                "patch": patch,
                "prerelease": prerelease_part,
                "build": build_metadata
            }

        def compare_prerelease(self, p1, p2):
            # 比较预发布版本
            p1_parts = p1.split(".")
            p2_parts = p2.split(".")

            for i in range(max(len(p1_parts), len(p2_parts))):
                if i >= len(p1_parts):
                    return -1
                if i >= len(p2_parts):
                    return 1

                part1 = p1_parts[i]
                part2 = p2_parts[i]

                # 数字比较
                if part1.isdigit() and part2.isdigit():
                    num1 = int(part1)
                    num2 = int(part2)
                    if num1 != num2:
                        return -1 if num1 < num2 else 1
                else:
                    # 字符串比较
                    if part1 != part2:
                        return -1 if part1 < part2 else 1

            return 0

#### 2.2 版本范围

    python


    class VersionRange:
        def __init__(self, range_spec):
            self.range_spec = range_spec
            self.comparator = VersionComparator()

        def satisfies(self, version):
            constraints = self.parse_range(self.range_spec)
            for constraint in constraints:
                if not self.check_constraint(version, constraint):
                    return False
            return True

        def parse_range(self, range_spec):
            constraints = []
            if not any(c in range_spec for c in ['^', '~', '>=', '<=', '>', '<', '||']):
                constraints.append({
                    "operator": "==",
                    "version": range_spec
                })
                return constraints
            parts = range_spec.split("||")
            for part in parts:
                part = part.strip()
                if part.startswith("^"):
                    version = part[1:]
                    constraints.append({
                        "operator": "^",
                        "version": version
                    })
                elif part.startswith("~"):
                    version = part[1:]
                    constraints.append({
                        "operator": "~",
                        "version": version
                    })
                elif part.startswith(">="):
                    version = part[2:]
                    constraints.append({
                        "operator": ">=",
                        "version": version
                    })
                elif part.startswith("<="):
                    version = part[2:]
                    constraints.append({
                        "operator": "<=",
                        "version": version
                    })
                elif part.startswith(">"):
                    version = part[1:]
                    constraints.append({
                        "operator": ">",
                        "version": version
                    })
                elif part.startswith("<"):
                    version = part[1:]
                    constraints.append({
                        "operator": "<",
                        "version": version
                    })
            return constraints

        def check_constraint(self, version, constraint):
            operator = constraint["operator"]
            constraint_version = constraint["version"]
            if operator == "==":
                return self.comparator.compare(version, constraint_version) == 0
            elif operator == ">=":
                return self.comparator.compare(version, constraint_version) >= 0
            elif operator == "<=":
                return self.comparator.compare(version, constraint_version) <= 0
            elif operator == ">":
                return self.comparator.compare(version, constraint_version) > 0
            elif operator == "<":
                return self.comparator.compare(version, constraint_version) < 0
            elif operator == "^":
                return self.check_caret(version, constraint_version)
            elif operator == "~":
                return self.check_tilde(version, constraint_version)
            return False

        def check_caret(self, version, constraint):
            constraint_parts = self.comparator.parse_version(constraint)
            min_version = constraint
            max_version = f"{constraint_parts['major'] + 1}.0.0"
            return (self.comparator.compare(version, min_version) >= 0 and
                    self.comparator.compare(version, max_version) < 0)

        def check_tilde(self, version, constraint):
            constraint_parts = self.comparator.parse_version(constraint)
            min_version = constraint
            max_version = f"{constraint_parts['major']}.{constraint_parts['minor'] + 1}.0"
            return (self.comparator.compare(version, min_version) >= 0 and
                    self.comparator.compare(version, max_version) < 0)

### 3\. 版本存储

#### 3.1 版本仓库

    python


    class VersionRepository:
        def __init__(self, storage_path):
            self.storage_path = storage_path
            self.comparator = VersionComparator()

        def save_version(self, skill_id, version, skill_data):
            version_path = self.get_version_path(skill_id, version)
            os.makedirs(os.path.dirname(version_path), exist_ok=True)

            with open(version_path, 'w') as f:
                json.dump(skill_data, f, indent=2)

        def load_version(self, skill_id, version):
            version_path = self.get_version_path(skill_id, version)

            if not os.path.exists(version_path):
                raise VersionNotFoundError(version)

            with open(version_path, 'r') as f:
                return json.load(f)

        def list_versions(self, skill_id):
            skill_path = self.get_skill_path(skill_id)

            if not os.path.exists(skill_path):
                return []

            versions = []
            for item in os.listdir(skill_path):
                if os.path.isdir(os.path.join(skill_path, item)):
                    versions.append(item)

            versions.sort(key=lambda v: self.comparator.parse_version(v))

            return versions

        def get_latest_version(self, skill_id):
            versions = self.list_versions(skill_id)

            if not versions:
                return None

            return versions[-1]

        def get_version_path(self, skill_id, version):
            return os.path.join(self.storage_path, skill_id, version, "skill.json")

        def get_skill_path(self, skill_id):
            return os.path.join(self.storage_path, skill_id)

#### 3.2 版本索引

    python


    class VersionIndex:
        def __init__(self, index_path):
            self.index_path = index_path
            self.index = self.load_index()

        def load_index(self):
            if os.path.exists(self.index_path):
                with open(self.index_path, 'r') as f:
                    return json.load(f)
            return {}

        def save_index(self):
            with open(self.index_path, 'w') as f:
                json.dump(self.index, f, indent=2)

        def add_version(self, skill_id, version, metadata):
            if skill_id not in self.index:
                self.index[skill_id] = {}
            self.index[skill_id][version] = metadata
            self.save_index()

        def get_version_metadata(self, skill_id, version):
            return self.index.get(skill_id, {}).get(version)

        def list_versions(self, skill_id):
            versions = self.index.get(skill_id, {})
            return list(versions.keys())

        def get_latest_version(self, skill_id):
            versions = self.list_versions(skill_id)
            if not versions:
                return None
            comparator = VersionComparator()
            return max(versions, key=lambda v: comparator.parse_version(v))

        def search_versions(self, skill_id, version_range):
            range_checker = VersionRange(version_range)
            versions = self.list_versions(skill_id)
            return [v for v in versions if range_checker.satisfies(v)]

## 版本发布

### 1\. 发布流程

#### 1.1 发布准备

    python


    class ReleasePreparer:
        def __init__(self):
            self.version_repository = VersionRepository("./skills")
            self.version_index = VersionIndex("./index.json")

        def prepare_release(self, skill_id, version, release_notes):
            if not self.validate_version(version):
                raise InvalidVersionError(version)

            if self.version_exists(skill_id, version):
                raise VersionAlreadyExistsError(version)

            skill_data = self.load_skill_data(skill_id)

            metadata = {
                "version": version,
                "release_date": datetime.now().isoformat(),
                "release_notes": release_notes,
                "checksum": self.calculate_checksum(skill_data)
            }

            return metadata

        def validate_version(self, version):
            pattern = r'^\d+\.\d+\.\d+(-[a-zA-Z0-9]+(\.\d+)?)?(\+[a-zA-Z0-9]+)?$'
            return re.match(pattern, version) is not None

        def version_exists(self, skill_id, version):
            versions = self.version_repository.list_versions(skill_id)
            return version in versions

        def load_skill_data(self, skill_id):
            pass

        def calculate_checksum(self, skill_data):
            data_str = json.dumps(skill_data, sort_keys=True)
            return hashlib.sha256(data_str.encode()).hexdigest()

#### 1.2 发布执行

    python


    class ReleasePublisher:
        def __init__(self):
            self.version_repository = VersionRepository("./skills")
            self.version_index = VersionIndex("./index.json")

        def publish_release(self, skill_id, version, skill_data, metadata):
            self.version_repository.save_version(skill_id, version, skill_data)
            self.version_index.add_version(skill_id, version, metadata)
            self.create_tag(skill_id, version)
            self.notify_subscribers(skill_id, version, metadata)

        def create_tag(self, skill_id, version):
            tag_name = f"{skill_id}/v{version}"
            tag_message = f"Release {skill_id} version {version}"
            subprocess.run([
                "git", "tag", "-a", tag_name, "-m", tag_message
            ], check=True)

        def notify_subscribers(self, skill_id, version, metadata):
            subscribers = self.get_subscribers(skill_id)
            for subscriber in subscribers:
                self.send_notification(subscriber, skill_id, version, metadata)

        def get_subscribers(self, skill_id):
            pass

        def send_notification(self, subscriber, skill_id, version, metadata):
            pass

### 2\. 变更日志

#### 2.1 变更记录

    python


    class ChangeLogManager:
        def __init__(self):
            self.change_logs = {}

        def add_change(self, skill_id, version, change_type, description):
            if skill_id not in self.change_logs:
                self.change_logs[skill_id] = {}

            if version not in self.change_logs[skill_id]:
                self.change_logs[skill_id][version] = {
                    "version": version,
                    "date": datetime.now().isoformat(),
                    "changes": {
                        "added": [],
                        "changed": [],
                        "deprecated": [],
                        "removed": [],
                        "fixed": [],
                        "security": []
                    }
                }

            self.change_logs[skill_id][version]["changes"][change_type].append(description)

        def get_change_log(self, skill_id, version):
            return self.change_logs.get(skill_id, {}).get(version)

        def get_all_change_logs(self, skill_id):
            return self.change_logs.get(skill_id, {})

        def generate_release_notes(self, skill_id, version):
            change_log = self.get_change_log(skill_id, version)

            if not change_log:
                return "No changes recorded."

            notes = [f"# Release {version}"]
            notes.append(f"Date: {change_log['date']}")
            notes.append("")

            changes = change_log["changes"]

            if changes["added"]:
                notes.append("## Added")
                for change in changes["added"]:
                    notes.append(f"- {change}")
                notes.append("")

            if changes["changed"]:
                notes.append("## Changed")
                for change in changes["changed"]:
                    notes.append(f"- {change}")
                notes.append("")

            if changes["deprecated"]:
                notes.append("## Deprecated")
                for change in changes["deprecated"]:
                    notes.append(f"- {change}")
                notes.append("")

            if changes["removed"]:
                notes.append("## Removed")
                for change in changes["removed"]:
                    notes.append(f"- {change}")
                notes.append("")

            if changes["fixed"]:
                notes.append("## Fixed")
                for change in changes["fixed"]:
                    notes.append(f"- {change}")
                notes.append("")

            if changes["security"]:
                notes.append("## Security")
                for change in changes["security"]:
                    notes.append(f"- {change}")
                notes.append("")

            return "\n".join(notes)

#### 2.2 变更对比

    python


    class ChangeComparator:
        def __init__(self):
            self.version_repository = VersionRepository("./skills")

        def compare_versions(self, skill_id, from_version, to_version):
            from_data = self.version_repository.load_version(skill_id, from_version)
            to_data = self.version_repository.load_version(skill_id, to_version)
            changes = {
                "added": [],
                "removed": [],
                "modified": []
            }
            changes["added"].extend(self.find_added_parameters(from_data, to_data))
            changes["removed"].extend(self.find_removed_parameters(from_data, to_data))
            changes["modified"].extend(self.find_modified_parameters(from_data, to_data))
            changes["added"].extend(self.find_added_features(from_data, to_data))
            changes["removed"].extend(self.find_removed_features(from_data, to_data))
            changes["modified"].extend(self.find_modified_features(from_data, to_data))
            return changes

        def find_added_parameters(self, from_data, to_data):
            from_params = set(from_data.get("parameters", {}).keys())
            to_params = set(to_data.get("parameters", {}).keys())
            return list(to_params - from_params)

        def find_removed_parameters(self, from_data, to_data):
            from_params = set(from_data.get("parameters", {}).keys())
            to_params = set(to_data.get("parameters", {}).keys())
            return list(from_params - to_params)

        def find_modified_parameters(self, from_data, to_data):
            from_params = from_data.get("parameters", {})
            to_params = to_data.get("parameters", {})
            modified = []
            for param in from_params:
                if param in to_params and from_params[param] != to_params[param]:
                    modified.append(param)
            return modified

        def find_added_features(self, from_data, to_data):
            from_features = set(from_data.get("features", []))
            to_features = set(to_data.get("features", []))
            return list(to_features - from_features)

        def find_removed_features(self, from_data, to_data):
            from_features = set(from_data.get("features", []))
            to_features = set(to_data.get("features", []))
            return list(from_features - to_features)

        def find_modified_features(self, from_data, to_data):
            pass

## 版本迁移

## 版本迁移

### 1\. 迁移脚本

#### 1.1 迁移定义

    python


    class Migration:
        def __init__(self, from_version, to_version, migrate_func):
            self.from_version = from_version
            self.to_version = to_version
            self.migrate_func = migrate_func

        def can_migrate(self, from_version, to_version):
            comparator = VersionComparator()
            return (comparator.compare(from_version, self.from_version) >= 0 and
                    comparator.compare(to_version, self.to_version) <= 0)

        def migrate(self, data):
            return self.migrate_func(data)

#### 1.2 迁移注册

    python


    class MigrationRegistry:
        def __init__(self):
            self.migrations = []

        def register_migration(self, migration):
            self.migrations.append(migration)

        def get_migrations(self, from_version, to_version):
            applicable_migrations = []
            for migration in self.migrations:
                if migration.can_migrate(from_version, to_version):
                    applicable_migrations.append(migration)
            comparator = VersionComparator()
            applicable_migrations.sort(key=lambda m: comparator.parse_version(m.from_version))
            return applicable_migrations

### 2\. 迁移执行

#### 2.1 数据迁移

    python


    class DataMigrator:
        def __init__(self, migration_registry):
            self.migration_registry = migration_registry

        def migrate_data(self, data, from_version, to_version):
            migrations = self.migration_registry.get_migrations(from_version, to_version)
            current_data = data
            current_version = from_version

            for migration in migrations:
                current_data = migration.migrate(current_data)
                current_version = migration.to_version

            return current_data

        def migrate_skill(self, skill_id, from_version, to_version):
            version_repository = VersionRepository("./skills")
            data = version_repository.load_version(skill_id, from_version)
            migrated_data = self.migrate_data(data, from_version, to_version)
            version_repository.save_version(skill_id, to_version, migrated_data)
            return migrated_data

#### 2.2 配置迁移

    python


    class ConfigMigrator:
        def __init__(self, migration_registry):
            self.migration_registry = migration_registry

        def migrate_config(self, config, from_version, to_version):
            migrations = self.migration_registry.get_migrations(from_version, to_version)
            current_config = config
            current_version = from_version
            for migration in migrations:
                current_config = migration.migrate(current_config)
                current_version = migration.to_version
            return current_config

        def migrate_user_config(self, user_id, from_version, to_version):
            config = self.load_user_config(user_id)
            migrated_config = self.migrate_config(config, from_version, to_version)
            self.save_user_config(user_id, migrated_config)
            return migrated_config

        def load_user_config(self, user_id):
            pass

        def save_user_config(self, user_id, config):
            pass

## 版本回滚

### 1\. 回滚准备

    bash


    python

    class RollbackPreparer:
        def __init__(self):
            self.version_repository = VersionRepository("./skills")
            self.backup_repository = BackupRepository("./backups")

        def prepare_rollback(self, skill_id, from_version, to_version):
            # 验证版本
            if not self.version_exists(skill_id, to_version):
                raise VersionNotFoundError(to_version)

            # 创建备份
            backup_id = self.create_backup(skill_id, from_version)

            # 准备回滚
            rollback_plan = {
                "skill_id": skill_id,
                "from_version": from_version,
                "to_version": to_version,
                "backup_id": backup_id,
                "steps": self.generate_rollback_steps(skill_id, from_version, to_version)
            }

            return rollback_plan

        def version_exists(self, skill_id, version):
            versions = self.version_repository.list_versions(skill_id)
            return version in versions

        def create_backup(self, skill_id, version):
            # 创建备份
            backup_id = self.backup_repository.create_backup(skill_id, version)
            return backup_id

        def generate_rollback_steps(self, skill_id, from_version, to_version):
            # 生成回滚步骤
            steps = [
                {
                    "step": 1,
                    "action": "stop_skill",
                    "description": "停止 Skill"
                },
                {
                    "step": 2,
                    "action": "restore_version",
                    "description": f"恢复到版本 {to_version}"
                },
                {
                    "step": 3,
                    "action": "migrate_config",
                    "description": "迁移配置"
                },
                {
                    "step": 4,
                    "action": "start_skill",
                    "description": "启动 Skill"
                },
                {
                    "step": 5,
                    "action": "verify",
                    "description": "验证回滚"
                }
            ]

            return steps

    ### 2. 回滚执行

    ```python

    class RollbackExecutor:
        def __init__(self):
            self.version_repository = VersionRepository("./skills")
            self.backup_repository = BackupRepository("./backups")
            self.config_migrator = ConfigMigrator(MigrationRegistry())

```python
        def execute_rollback(self, rollback_plan):
            try:
                for step in rollback_plan["steps"]:
                    self.execute_step(rollback_plan, step)
                return True
            except Exception as e:
                self.restore_backup(rollback_plan["backup_id"])
                raise e

        def execute_step(self, rollback_plan, step):
            action = step["action"]
            if action == "stop_skill":
                self.stop_skill(rollback_plan["skill_id"])
            elif action == "restore_version":
                self.restore_version(
                    rollback_plan["skill_id"],
                    rollback_plan["to_version"]
                )
            elif action == "migrate_config":
                self.migrate_config(
                    rollback_plan["from_version"],
                    rollback_plan["to_version"]
                )
            elif action == "start_skill":
                self.start_skill(rollback_plan["skill_id"])
            elif action == "verify":
                self.verify_rollback(rollback_plan)

        def stop_skill(self, skill_id):
            pass

        def restore_version(self, skill_id, version):
            pass

        def migrate_config(self, from_version, to_version):
            pass

        def start_skill(self, skill_id):
            pass

        def verify_rollback(self, rollback_plan):
            pass

        def restore_backup(self, backup_id):
            self.backup_repository.restore_backup(backup_id)

基于 MIT 许可发布 | 永久导航