16.5 Skills 版本管理
版本管理概述
Skills 的版本管理是维护和演进 Skills 的重要机制。本节将详细介绍 Skills 的版本控制、版本发布和版本迁移等内容。
版本控制
1. 版本号规范
1.1 语义化版本
语义化版本(SemVer)
版本格式
主版本号.次版本号.修订号(MAJOR.MINOR.PATCH)
版本规则
- 主版本号(MAJOR) :不兼容的 API 修改
- 次版本号(MINOR) :向下兼容的功能性新增
- 修订号(PATCH) :向下兼容的问题修正
示例
- 1.0.0 → 1.1.0:新增功能(向下兼容)
- 1.1.0 → 1.1.1:修复 bug(向下兼容)
- 1.1.1 → 2.0.0:不兼容的 API 变更
1.2 预发布版本
格式
主版本号.次版本号.修订号-预发布标识.预发布版本号
预发布标识
- alpha:内部测试版
- beta:公开测试版
- rc:候选发布版
示例
- 1.0.0-alpha.1:第一个 alpha 版本
- 1.0.0-beta.1:第一个 beta 版本
- 1.0.0-rc.1:第一个候选版本
1.3 构建元数据
格式
主版本号.次版本号.修订号+构建元数据
构建元数据
- 构建日期
- 构建号
- Git 提交哈希
示例
- 1.0.0+20240115
- 1.0.0+build.123
- 1.0.0+abc123def456
python
### 2\. 版本比较
#### 2.1 版本比较算法
python
class VersionComparator:
def compare(self, v1, v2):
v1_parts = self.parse_version(v1)
v2_parts = self.parse_version(v2)
# 比较主版本号
if v1_parts["major"] != v2_parts["major"]:
return -1 if v1_parts["major"] < v2_parts["major"] else 1
# 比较次版本号
if v1_parts["minor"] != v2_parts["minor"]:
return -1 if v1_parts["minor"] < v2_parts["minor"] else 1
# 比较修订号
if v1_parts["patch"] != v2_parts["patch"]:
return -1 if v1_parts["patch"] < v2_parts["patch"] else 1
# 比较预发布版本
if v1_parts["prerelease"] and v2_parts["prerelease"]:
return self.compare_prerelease(v1_parts["prerelease"], v2_parts["prerelease"])
elif v1_parts["prerelease"]:
return -1
elif v2_parts["prerelease"]:
return 1
return 0
def parse_version(self, version):
# 解析版本号
parts = version.split("-", 1)
version_part = parts[0]
prerelease_part = parts[1] if len(parts) > 1 else None
version_numbers = version_part.split(".")
major = int(version_numbers[0]) if len(version_numbers) > 0 else 0
minor = int(version_numbers[1]) if len(version_numbers) > 1 else 0
patch = int(version_numbers[2]) if len(version_numbers) > 2 else 0
# 解析构建元数据
build_metadata = None
if "+" in version_part:
version_part, build_metadata = version_part.split("+", 1)
return {
"major": major,
"minor": minor,
"patch": patch,
"prerelease": prerelease_part,
"build": build_metadata
}
def compare_prerelease(self, p1, p2):
# 比较预发布版本
p1_parts = p1.split(".")
p2_parts = p2.split(".")
for i in range(max(len(p1_parts), len(p2_parts))):
if i >= len(p1_parts):
return -1
if i >= len(p2_parts):
return 1
part1 = p1_parts[i]
part2 = p2_parts[i]
# 数字比较
if part1.isdigit() and part2.isdigit():
num1 = int(part1)
num2 = int(part2)
if num1 != num2:
return -1 if num1 < num2 else 1
else:
# 字符串比较
if part1 != part2:
return -1 if part1 < part2 else 1
return 0
#### 2.2 版本范围
python
class VersionRange:
def __init__(self, range_spec):
self.range_spec = range_spec
self.comparator = VersionComparator()
def satisfies(self, version):
constraints = self.parse_range(self.range_spec)
for constraint in constraints:
if not self.check_constraint(version, constraint):
return False
return True
def parse_range(self, range_spec):
constraints = []
if not any(c in range_spec for c in ['^', '~', '>=', '<=', '>', '<', '||']):
constraints.append({
"operator": "==",
"version": range_spec
})
return constraints
parts = range_spec.split("||")
for part in parts:
part = part.strip()
if part.startswith("^"):
version = part[1:]
constraints.append({
"operator": "^",
"version": version
})
elif part.startswith("~"):
version = part[1:]
constraints.append({
"operator": "~",
"version": version
})
elif part.startswith(">="):
version = part[2:]
constraints.append({
"operator": ">=",
"version": version
})
elif part.startswith("<="):
version = part[2:]
constraints.append({
"operator": "<=",
"version": version
})
elif part.startswith(">"):
version = part[1:]
constraints.append({
"operator": ">",
"version": version
})
elif part.startswith("<"):
version = part[1:]
constraints.append({
"operator": "<",
"version": version
})
return constraints
def check_constraint(self, version, constraint):
operator = constraint["operator"]
constraint_version = constraint["version"]
if operator == "==":
return self.comparator.compare(version, constraint_version) == 0
elif operator == ">=":
return self.comparator.compare(version, constraint_version) >= 0
elif operator == "<=":
return self.comparator.compare(version, constraint_version) <= 0
elif operator == ">":
return self.comparator.compare(version, constraint_version) > 0
elif operator == "<":
return self.comparator.compare(version, constraint_version) < 0
elif operator == "^":
return self.check_caret(version, constraint_version)
elif operator == "~":
return self.check_tilde(version, constraint_version)
return False
def check_caret(self, version, constraint):
constraint_parts = self.comparator.parse_version(constraint)
min_version = constraint
max_version = f"{constraint_parts['major'] + 1}.0.0"
return (self.comparator.compare(version, min_version) >= 0 and
self.comparator.compare(version, max_version) < 0)
def check_tilde(self, version, constraint):
constraint_parts = self.comparator.parse_version(constraint)
min_version = constraint
max_version = f"{constraint_parts['major']}.{constraint_parts['minor'] + 1}.0"
return (self.comparator.compare(version, min_version) >= 0 and
self.comparator.compare(version, max_version) < 0)
### 3\. 版本存储
#### 3.1 版本仓库
python
class VersionRepository:
def __init__(self, storage_path):
self.storage_path = storage_path
self.comparator = VersionComparator()
def save_version(self, skill_id, version, skill_data):
version_path = self.get_version_path(skill_id, version)
os.makedirs(os.path.dirname(version_path), exist_ok=True)
with open(version_path, 'w') as f:
json.dump(skill_data, f, indent=2)
def load_version(self, skill_id, version):
version_path = self.get_version_path(skill_id, version)
if not os.path.exists(version_path):
raise VersionNotFoundError(version)
with open(version_path, 'r') as f:
return json.load(f)
def list_versions(self, skill_id):
skill_path = self.get_skill_path(skill_id)
if not os.path.exists(skill_path):
return []
versions = []
for item in os.listdir(skill_path):
if os.path.isdir(os.path.join(skill_path, item)):
versions.append(item)
versions.sort(key=lambda v: self.comparator.parse_version(v))
return versions
def get_latest_version(self, skill_id):
versions = self.list_versions(skill_id)
if not versions:
return None
return versions[-1]
def get_version_path(self, skill_id, version):
return os.path.join(self.storage_path, skill_id, version, "skill.json")
def get_skill_path(self, skill_id):
return os.path.join(self.storage_path, skill_id)
#### 3.2 版本索引
python
class VersionIndex:
def __init__(self, index_path):
self.index_path = index_path
self.index = self.load_index()
def load_index(self):
if os.path.exists(self.index_path):
with open(self.index_path, 'r') as f:
return json.load(f)
return {}
def save_index(self):
with open(self.index_path, 'w') as f:
json.dump(self.index, f, indent=2)
def add_version(self, skill_id, version, metadata):
if skill_id not in self.index:
self.index[skill_id] = {}
self.index[skill_id][version] = metadata
self.save_index()
def get_version_metadata(self, skill_id, version):
return self.index.get(skill_id, {}).get(version)
def list_versions(self, skill_id):
versions = self.index.get(skill_id, {})
return list(versions.keys())
def get_latest_version(self, skill_id):
versions = self.list_versions(skill_id)
if not versions:
return None
comparator = VersionComparator()
return max(versions, key=lambda v: comparator.parse_version(v))
def search_versions(self, skill_id, version_range):
range_checker = VersionRange(version_range)
versions = self.list_versions(skill_id)
return [v for v in versions if range_checker.satisfies(v)]
## 版本发布
### 1\. 发布流程
#### 1.1 发布准备
python
class ReleasePreparer:
def __init__(self):
self.version_repository = VersionRepository("./skills")
self.version_index = VersionIndex("./index.json")
def prepare_release(self, skill_id, version, release_notes):
if not self.validate_version(version):
raise InvalidVersionError(version)
if self.version_exists(skill_id, version):
raise VersionAlreadyExistsError(version)
skill_data = self.load_skill_data(skill_id)
metadata = {
"version": version,
"release_date": datetime.now().isoformat(),
"release_notes": release_notes,
"checksum": self.calculate_checksum(skill_data)
}
return metadata
def validate_version(self, version):
pattern = r'^\d+\.\d+\.\d+(-[a-zA-Z0-9]+(\.\d+)?)?(\+[a-zA-Z0-9]+)?$'
return re.match(pattern, version) is not None
def version_exists(self, skill_id, version):
versions = self.version_repository.list_versions(skill_id)
return version in versions
def load_skill_data(self, skill_id):
pass
def calculate_checksum(self, skill_data):
data_str = json.dumps(skill_data, sort_keys=True)
return hashlib.sha256(data_str.encode()).hexdigest()
#### 1.2 发布执行
python
class ReleasePublisher:
def __init__(self):
self.version_repository = VersionRepository("./skills")
self.version_index = VersionIndex("./index.json")
def publish_release(self, skill_id, version, skill_data, metadata):
self.version_repository.save_version(skill_id, version, skill_data)
self.version_index.add_version(skill_id, version, metadata)
self.create_tag(skill_id, version)
self.notify_subscribers(skill_id, version, metadata)
def create_tag(self, skill_id, version):
tag_name = f"{skill_id}/v{version}"
tag_message = f"Release {skill_id} version {version}"
subprocess.run([
"git", "tag", "-a", tag_name, "-m", tag_message
], check=True)
def notify_subscribers(self, skill_id, version, metadata):
subscribers = self.get_subscribers(skill_id)
for subscriber in subscribers:
self.send_notification(subscriber, skill_id, version, metadata)
def get_subscribers(self, skill_id):
pass
def send_notification(self, subscriber, skill_id, version, metadata):
pass
### 2\. 变更日志
#### 2.1 变更记录
python
class ChangeLogManager:
def __init__(self):
self.change_logs = {}
def add_change(self, skill_id, version, change_type, description):
if skill_id not in self.change_logs:
self.change_logs[skill_id] = {}
if version not in self.change_logs[skill_id]:
self.change_logs[skill_id][version] = {
"version": version,
"date": datetime.now().isoformat(),
"changes": {
"added": [],
"changed": [],
"deprecated": [],
"removed": [],
"fixed": [],
"security": []
}
}
self.change_logs[skill_id][version]["changes"][change_type].append(description)
def get_change_log(self, skill_id, version):
return self.change_logs.get(skill_id, {}).get(version)
def get_all_change_logs(self, skill_id):
return self.change_logs.get(skill_id, {})
def generate_release_notes(self, skill_id, version):
change_log = self.get_change_log(skill_id, version)
if not change_log:
return "No changes recorded."
notes = [f"# Release {version}"]
notes.append(f"Date: {change_log['date']}")
notes.append("")
changes = change_log["changes"]
if changes["added"]:
notes.append("## Added")
for change in changes["added"]:
notes.append(f"- {change}")
notes.append("")
if changes["changed"]:
notes.append("## Changed")
for change in changes["changed"]:
notes.append(f"- {change}")
notes.append("")
if changes["deprecated"]:
notes.append("## Deprecated")
for change in changes["deprecated"]:
notes.append(f"- {change}")
notes.append("")
if changes["removed"]:
notes.append("## Removed")
for change in changes["removed"]:
notes.append(f"- {change}")
notes.append("")
if changes["fixed"]:
notes.append("## Fixed")
for change in changes["fixed"]:
notes.append(f"- {change}")
notes.append("")
if changes["security"]:
notes.append("## Security")
for change in changes["security"]:
notes.append(f"- {change}")
notes.append("")
return "\n".join(notes)
#### 2.2 变更对比
python
class ChangeComparator:
def __init__(self):
self.version_repository = VersionRepository("./skills")
def compare_versions(self, skill_id, from_version, to_version):
from_data = self.version_repository.load_version(skill_id, from_version)
to_data = self.version_repository.load_version(skill_id, to_version)
changes = {
"added": [],
"removed": [],
"modified": []
}
changes["added"].extend(self.find_added_parameters(from_data, to_data))
changes["removed"].extend(self.find_removed_parameters(from_data, to_data))
changes["modified"].extend(self.find_modified_parameters(from_data, to_data))
changes["added"].extend(self.find_added_features(from_data, to_data))
changes["removed"].extend(self.find_removed_features(from_data, to_data))
changes["modified"].extend(self.find_modified_features(from_data, to_data))
return changes
def find_added_parameters(self, from_data, to_data):
from_params = set(from_data.get("parameters", {}).keys())
to_params = set(to_data.get("parameters", {}).keys())
return list(to_params - from_params)
def find_removed_parameters(self, from_data, to_data):
from_params = set(from_data.get("parameters", {}).keys())
to_params = set(to_data.get("parameters", {}).keys())
return list(from_params - to_params)
def find_modified_parameters(self, from_data, to_data):
from_params = from_data.get("parameters", {})
to_params = to_data.get("parameters", {})
modified = []
for param in from_params:
if param in to_params and from_params[param] != to_params[param]:
modified.append(param)
return modified
def find_added_features(self, from_data, to_data):
from_features = set(from_data.get("features", []))
to_features = set(to_data.get("features", []))
return list(to_features - from_features)
def find_removed_features(self, from_data, to_data):
from_features = set(from_data.get("features", []))
to_features = set(to_data.get("features", []))
return list(from_features - to_features)
def find_modified_features(self, from_data, to_data):
pass
## 版本迁移
## 版本迁移
### 1\. 迁移脚本
#### 1.1 迁移定义
python
class Migration:
def __init__(self, from_version, to_version, migrate_func):
self.from_version = from_version
self.to_version = to_version
self.migrate_func = migrate_func
def can_migrate(self, from_version, to_version):
comparator = VersionComparator()
return (comparator.compare(from_version, self.from_version) >= 0 and
comparator.compare(to_version, self.to_version) <= 0)
def migrate(self, data):
return self.migrate_func(data)
#### 1.2 迁移注册
python
class MigrationRegistry:
def __init__(self):
self.migrations = []
def register_migration(self, migration):
self.migrations.append(migration)
def get_migrations(self, from_version, to_version):
applicable_migrations = []
for migration in self.migrations:
if migration.can_migrate(from_version, to_version):
applicable_migrations.append(migration)
comparator = VersionComparator()
applicable_migrations.sort(key=lambda m: comparator.parse_version(m.from_version))
return applicable_migrations
### 2\. 迁移执行
#### 2.1 数据迁移
python
class DataMigrator:
def __init__(self, migration_registry):
self.migration_registry = migration_registry
def migrate_data(self, data, from_version, to_version):
migrations = self.migration_registry.get_migrations(from_version, to_version)
current_data = data
current_version = from_version
for migration in migrations:
current_data = migration.migrate(current_data)
current_version = migration.to_version
return current_data
def migrate_skill(self, skill_id, from_version, to_version):
version_repository = VersionRepository("./skills")
data = version_repository.load_version(skill_id, from_version)
migrated_data = self.migrate_data(data, from_version, to_version)
version_repository.save_version(skill_id, to_version, migrated_data)
return migrated_data
#### 2.2 配置迁移
python
class ConfigMigrator:
def __init__(self, migration_registry):
self.migration_registry = migration_registry
def migrate_config(self, config, from_version, to_version):
migrations = self.migration_registry.get_migrations(from_version, to_version)
current_config = config
current_version = from_version
for migration in migrations:
current_config = migration.migrate(current_config)
current_version = migration.to_version
return current_config
def migrate_user_config(self, user_id, from_version, to_version):
config = self.load_user_config(user_id)
migrated_config = self.migrate_config(config, from_version, to_version)
self.save_user_config(user_id, migrated_config)
return migrated_config
def load_user_config(self, user_id):
pass
def save_user_config(self, user_id, config):
pass
## 版本回滚
### 1\. 回滚准备
bash
python
class RollbackPreparer:
def __init__(self):
self.version_repository = VersionRepository("./skills")
self.backup_repository = BackupRepository("./backups")
def prepare_rollback(self, skill_id, from_version, to_version):
# 验证版本
if not self.version_exists(skill_id, to_version):
raise VersionNotFoundError(to_version)
# 创建备份
backup_id = self.create_backup(skill_id, from_version)
# 准备回滚
rollback_plan = {
"skill_id": skill_id,
"from_version": from_version,
"to_version": to_version,
"backup_id": backup_id,
"steps": self.generate_rollback_steps(skill_id, from_version, to_version)
}
return rollback_plan
def version_exists(self, skill_id, version):
versions = self.version_repository.list_versions(skill_id)
return version in versions
def create_backup(self, skill_id, version):
# 创建备份
backup_id = self.backup_repository.create_backup(skill_id, version)
return backup_id
def generate_rollback_steps(self, skill_id, from_version, to_version):
# 生成回滚步骤
steps = [
{
"step": 1,
"action": "stop_skill",
"description": "停止 Skill"
},
{
"step": 2,
"action": "restore_version",
"description": f"恢复到版本 {to_version}"
},
{
"step": 3,
"action": "migrate_config",
"description": "迁移配置"
},
{
"step": 4,
"action": "start_skill",
"description": "启动 Skill"
},
{
"step": 5,
"action": "verify",
"description": "验证回滚"
}
]
return steps
### 2. 回滚执行
```python
class RollbackExecutor:
def __init__(self):
self.version_repository = VersionRepository("./skills")
self.backup_repository = BackupRepository("./backups")
self.config_migrator = ConfigMigrator(MigrationRegistry())
```python
def execute_rollback(self, rollback_plan):
try:
for step in rollback_plan["steps"]:
self.execute_step(rollback_plan, step)
return True
except Exception as e:
self.restore_backup(rollback_plan["backup_id"])
raise e
def execute_step(self, rollback_plan, step):
action = step["action"]
if action == "stop_skill":
self.stop_skill(rollback_plan["skill_id"])
elif action == "restore_version":
self.restore_version(
rollback_plan["skill_id"],
rollback_plan["to_version"]
)
elif action == "migrate_config":
self.migrate_config(
rollback_plan["from_version"],
rollback_plan["to_version"]
)
elif action == "start_skill":
self.start_skill(rollback_plan["skill_id"])
elif action == "verify":
self.verify_rollback(rollback_plan)
def stop_skill(self, skill_id):
pass
def restore_version(self, skill_id, version):
pass
def migrate_config(self, from_version, to_version):
pass
def start_skill(self, skill_id):
pass
def verify_rollback(self, rollback_plan):
pass
def restore_backup(self, backup_id):
self.backup_repository.restore_backup(backup_id)