25.2 代码生成与补全
代码生成与补全概述
Claude Code 提供了强大的代码生成与补全功能,可以根据自然语言描述、代码片段、API 规范等生成高质量的代码,并智能补全未完成的代码。
代码生成
1. 从自然语言生成代码
示例:从自然语言生成代码
用户请求: "创建一个用户认证系统,支持注册、登录、密码重置功能"
Claude Code 生成的代码:
python
python
````python
```python
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
from flask_mail import Mail, Message
from datetime import datetime, timedelta
import bcrypt
import secrets
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///auth.db'
app.config['JWT_SECRET_KEY'] = 'your-secret-key'
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=24)
app.config['MAIL_SERVER'] = 'smtp.gmail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = 'your-email@gmail.com'
app.config['MAIL_PASSWORD'] = 'your-password'
db = SQLAlchemy(app)
jwt = JWTManager(app)
mail = Mail(app)
class User(db.Model):
"""用户模型"""
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
is_active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
reset_token = db.Column(db.String(100), unique=True)
reset_token_expires = db.Column(db.DateTime)
class PasswordReset(db.Model):
"""密码重置记录"""
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
token = db.Column(db.String(100), unique=True, nullable=False)
expires_at = db.Column(db.DateTime, nullable=False)
used = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
@app.route('/register', methods=['POST'])
def register():
"""用户注册"""
data = request.get_json()
# 验证输入
if not data.get('username') or not data.get('email') or not data.get('password'):
return jsonify({'error': 'Missing required fields'}), 400
# 检查用户名是否已存在
if User.query.filter_by(username=data['username']).first():
return jsonify({'error': 'Username already exists'}), 400
# 检查邮箱是否已存在
if User.query.filter_by(email=data['email']).first():
return jsonify({'error': 'Email already exists'}), 400
# 验证密码强度
if len(data['password']) < 8:
return jsonify({'error': 'Password must be at least 8 characters'}), 400
# 创建用户
user = User(
username=data['username'],
email=data['email'],
password_hash=bcrypt.hashpw(
data['password'].encode('utf-8'),
bcrypt.gensalt()
).decode('utf-8')
)
db.session.add(user)
db.session.commit()
# 生成访问令牌
access_token = create_access_token(identity=user.id)
return jsonify({
'message': 'User registered successfully',
'user': {
'id': user.id,
'username': user.username,
'email': user.email
},
'access_token': access_token
}), 201
@app.route('/login', methods=['POST'])
def login():
"""用户登录"""
data = request.get_json()
# 验证输入
if not data.get('username') or not data.get('password'):
return jsonify({'error': 'Missing username or password'}), 400
# 查找用户
user = User.query.filter_by(username=data['username']).first()
if not user or not bcrypt.checkpw(
data['password'].encode('utf-8'),
user.password_hash.encode('utf-8')
):
return jsonify({'error': 'Invalid username or password'}), 401
if not user.is_active:
return jsonify({'error': 'Account is deactivated'}), 403
# 生成访问令牌
access_token = create_access_token(identity=user.id)
return jsonify({
'message': 'Login successful',
'user': {
'id': user.id,
'username': user.username,
'email': user.email
},
'access_token': access_token
})
@app.route('/me', methods=['GET'])
@jwt_required()
def get_current_user():
"""获取当前用户信息"""
user_id = get_jwt_identity()
user = User.query.get(user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
return jsonify({
'id': user.id,
'username': user.username,
'email': user.email,
'is_active': user.is_active,
'created_at': user.created_at.isoformat()
})
@app.route('/request-password-reset', methods=['POST'])
def request_password_reset():
"""请求密码重置"""
data = request.get_json()
if not data.get('email'):
return jsonify({'error': 'Email is required'}), 400
user = User.query.filter_by(email=data['email']).first()
if not user:
# 为了安全,不透露邮箱是否存在
return jsonify({
'message': 'If the email exists, a reset link has been sent'
}), 200
# 生成重置令牌
reset_token = secrets.token_urlsafe(32)
reset_token_expires = datetime.utcnow() + timedelta(hours=1)
# 保存重置令牌
password_reset = PasswordReset(
user_id=user.id,
token=reset_token,
expires_at=reset_token_expires
)
db.session.add(password_reset)
db.session.commit()
# 发送重置邮件
reset_link = f"http://localhost:3000/reset-password?token={reset_token}"
msg = Message(
'Password Reset Request',
recipients=[user.email]
)
msg.body = f'''
Hello {user.username},
You have requested to reset your password. Click the link below to reset your password:
{reset_link}
This link will expire in 1 hour.
If you did not request this password reset, please ignore this email.
'''
mail.send(msg)
return jsonify({
'message': 'If the email exists, a reset link has been sent'
}), 200
@app.route('/reset-password', methods=['POST'])
def reset_password():
"""重置密码"""
data = request.get_json()
if not data.get('token') or not data.get('password'):
return jsonify({'error': 'Token and password are required'}), 400
# 查找重置记录
password_reset = PasswordReset.query.filter_by(
token=data['token'],
used=False
).first()
if not password_reset:
return jsonify({'error': 'Invalid or expired reset token'}), 400
if password_reset.expires_at < datetime.utcnow():
return jsonify({'error': 'Reset token has expired'}), 400
# 验证密码强度
if len(data['password']) < 8:
return jsonify({'error': 'Password must be at least 8 characters'}), 400
# 更新密码
user = User.query.get(password_reset.user_id)
user.password_hash = bcrypt.hashpw(
data['password'].encode('utf-8'),
bcrypt.gensalt()
).decode('utf-8')
# 标记重置令牌为已使用
password_reset.used = True
db.session.commit()
return jsonify({'message': 'Password reset successful'}), 200
@app.route('/change-password', methods=['POST'])
@jwt_required()
def change_password():
"""修改密码"""
data = request.get_json()
user_id = get_jwt_identity()
if not data.get('current_password') or not data.get('new_password'):
return jsonify({'error': 'Current password and new password are required'}), 400
user = User.query.get(user_id)
# 验证当前密码
if not bcrypt.checkpw(
data['current_password'].encode('utf-8'),
user.password_hash.encode('utf-8')
):
return jsonify({'error': 'Current password is incorrect'}), 400
# 验证新密码强度
if len(data['new_password']) < 8:
return jsonify({'error': 'New password must be at least 8 characters'}), 400
# 更新密码
user.password_hash = bcrypt.hashpw(
data['new_password'].encode('utf-8'),
bcrypt.gensalt()
).decode('utf-8')
db.session.commit()
return jsonify({'message': 'Password changed successfully'}), 200
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True)
```> **生成的代码包含**:
> - 用户注册功能
> - 用户登录功能
> - JWT 认证
> - 密码重置请求
> - 密码重置
> - 修改密码
> - 邮件发送
> - 数据验证
> - 错误处理
```
### 2. 从 API 规范生成代码
# 示例:从 OpenAPI 规范生成代码
用户请求:
"根据这个 OpenAPI 规范生成 API 代码"
OpenAPI 规范:
````yaml
`yaml
openapi: 3.0.0
info:
title: Product API
version: 1.0.0
paths:
/products:
get:
summary: Get all products
parameters:
- name: page
in: query
schema:
type: integer
- name: per_page
in: query
schema:
type: integer
responses:
'200':
description: Success
content:
application/json:
schema:
type: object
properties:
products:
type: array
items:
$ref: '#/components/schemas/Product'
total:
type: integer
page:
type: integer
post:
summary: Create a product
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ProductCreate'
responses:
'201':
description: Created
content:
application/json:
schema:
$ref: '#/components/schemas/Product'
/products/{product_id}:
get:
summary: Get a product
parameters:
- name: product_id
in: path
required: true
schema:
type: integer
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/Product'
put:
summary: Update a product
parameters:
- name: product_id
in: path
required: true
schema:
type: integer
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ProductUpdate'
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/Product'
delete:
summary: Delete a product
parameters:
- name: product_id
in: path
required: true
schema:
type: integer
responses:
'204':
description: No content
components:
schemas:
Product:
type: object
properties:
id:
type: integer
name:
type: string
description:
type: string
price:
type: number
stock:
type: integer
category:
type: string
created_at:
type: string
format: date-time
updated_at:
type: string
format: date-time
ProductCreate:
type: object
required:
- name
- price
- stock
- category
properties:
name:
type: string
description:
type: string
price:
type: number
stock:
type: integer
category:
type: string
ProductUpdate:
type: object
properties:
name:
type: string
description:
type: string
price:
type: number
stock:
type: integer
category:
type: string
```> Claude Code 生成的代码:
```
python
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
from typing import Optional
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///products.db'
db = SQLAlchemy(app)
class Product(db.Model):
"""商品模型"""
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), nullable=False)
description = db.Column(db.Text)
price = db.Column(db.Float, nullable=False)
stock = db.Column(db.Integer, nullable=False)
category = db.Column(db.String(100), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self):
"""转换为字典"""
return {
'id': self.id,
'name': self.name,
'description': self.description,
'price': self.price,
'stock': self.stock,
'category': self.category,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
@app.route('/products', methods=['GET'])
def get_products():
"""获取所有商品"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
pagination = Product.query.paginate(page=page, per_page=per_page)
return jsonify({
'products': [product.to_dict() for product in pagination.items],
'total': pagination.total,
'page': page
})
@app.route('/products', methods=['POST'])
def create_product():
"""创建商品"""
data = request.get_json()
# 验证必填字段
required_fields = ['name', 'price', 'stock', 'category']
for field in required_fields:
if field not in data:
return jsonify({'error': f'Missing required field: {field}'}), 400
# 验证数据类型
if not isinstance(data['price'], (int, float)) or data['price'] < 0:
return jsonify({'error': 'Price must be a positive number'}), 400
if not isinstance(data['stock'], int) or data['stock'] < 0:
return jsonify({'error': 'Stock must be a non-negative integer'}), 400
# 创建商品
product = Product(
name=data['name'],
description=data.get('description'),
price=data['price'],
stock=data['stock'],
category=data['category']
)
db.session.add(product)
db.session.commit()
return jsonify(product.to_dict()), 201
@app.route('/products/<int:product_id>', methods=['GET'])
def get_product(product_id):
"""获取商品详情"""
product = Product.query.get_or_404(product_id)
return jsonify(product.to_dict())
@app.route('/products/<int:product_id>', methods=['PUT'])
def update_product(product_id):
"""更新商品"""
product = Product.query.get_or_404(product_id)
data = request.get_json()
# 更新字段
if 'name' in data:
product.name = data['name']
if 'description' in data:
product.description = data['description']
if 'price' in data:
if not isinstance(data['price'], (int, float)) or data['price'] < 0:
return jsonify({'error': 'Price must be a positive number'}), 400
product.price = data['price']
if 'stock' in data:
if not isinstance(data['stock'], int) or data['stock'] < 0:
return jsonify({'error': 'Stock must be a non-negative integer'}), 400
product.stock = data['stock']
if 'category' in data:
product.category = data['category']
product.updated_at = datetime.utcnow()
db.session.commit()
return jsonify(product.to_dict())
@app.route('/products/<int:product_id>', methods=['DELETE'])
def delete_product(product_id):
"""删除商品"""
product = Product.query.get_or_404(product_id)
db.session.delete(product)
db.session.commit()
return '', 204
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True)
> **生成的代码包含**:
> - 完整的 CRUD 操作
> - 数据验证
> - 分页功能
> - 错误处理
> - 符合 OpenAPI 规范
## 代码补全
### 1. 智能方法补全
```
python
# 示例:智能方法补全
用户输入:
```python
```python
class OrderService:
def __init__(self, db):
self.db = db
def create_order(self, user_id, items):
"""创建订单"""
pass
Claude Code 自动补全:
````python
````python
class OrderService:
def __init__(self, db):
self.db = db
def create_order(self, user_id, items):
"""
创建订单
参数:
user_id: 用户 ID
items: 商品列表 [{'product_id': 1, 'quantity': 2}, ...]
返回:
订单对象
"""
# 验证用户
user = self.db.session.query(User).filter_by(id=user_id).first()
if not user:
raise UserNotFoundError(f"User with id {user_id} not found")
# 验证商品
product_ids = [item['product_id'] for item in items]
products = self.db.session.query(Product).filter(
Product.id.in_(product_ids)
).all()
product_map = {product.id: product for product in products}
for item in items:
if item['product_id'] not in product_map:
raise ProductNotFoundError(
f"Product with id {item['product_id']} not found"
)
product = product_map[item['product_id']]
if product.stock < item['quantity']:
raise InsufficientStockError(
f"Insufficient stock for product {product.name}"
)
# 计算总金额
total_amount = sum(
product_map[item['product_id']].price * item['quantity']
for item in items
)
# 创建订单
order = Order(
user_id=user_id,
total_amount=total_amount,
status='pending'
)
self.db.session.add(order)
self.db.session.flush()
# 创建订单项
for item in items:
product = product_map[item['product_id']]
order_item = OrderItem(
order_id=order.id,
product_id=item['product_id'],
quantity=item['quantity'],
price=product.price
)
self.db.session.add(order_item)
# 扣减库存
product.stock -= item['quantity']
self.db.session.commit()
return order
def get_order(self, order_id):
"""获取订单详情"""
order = self.db.session.query(Order).filter_by(id=order_id).first()
if not order:
raise OrderNotFoundError(f"Order with id {order_id} not found")
return order
def update_order_status(self, order_id, status):
"""更新订单状态"""
order = self.get_order(order_id)
order.status = status
order.updated_at = datetime.utcnow()
self.db.session.commit()
return order
def cancel_order(self, order_id):
"""取消订单"""
order = self.get_order(order_id)
if order.status in ['paid', 'shipped', 'completed']:
raise InvalidOrderStatusError(
f"Cannot cancel order with status {order.status}"
)
# 恢复库存
for item in order.items:
product = self.db.session.query(Product).filter_by(
id=item.product_id
).first()
if product:
product.stock += item.quantity
order.status = 'cancelled'
order.updated_at = datetime.utcnow()
self.db.session.commit()
return order
def get_user_orders(self, user_id, page=1, per_page=20):
"""获取用户订单列表"""
pagination = self.db.session.query(Order).filter_by(
user_id=user_id
).order_by(
Order.created_at.desc()
).paginate(page=page, per_page=per_page)
return pagination
```> **补全的内容**:
> - 完整的方法实现
> - 数据验证
> - 错误处理
> - 库存管理
> - 相关方法补全
```
### 2. 智能类补全
# 示例:智能类补全
用户输入:
````python
`python
class UserRepository:
def __init__(self, db):
self.db = db
```> Claude Code 自动补全:
```
python
from typing import List, Optional
from datetime import datetime
class UserRepository:
"""用户仓储类"""
def __init__(self, db):
self.db = db
def find_by_id(self, user_id: int) -> Optional[User]:
"""根据 ID 查找用户"""
return self.db.session.query(User).filter_by(id=user_id).first()
def find_by_username(self, username: str) -> Optional[User]:
"""根据用户名查找用户"""
return self.db.session.query(User).filter_by(username=username).first()
def find_by_email(self, email: str) -> Optional[User]:
"""根据邮箱查找用户"""
return self.db.session.query(User).filter_by(email=email).first()
def find_all(self, page: int = 1, per_page: int = 20):
"""查找所有用户"""
return self.db.session.query(User).paginate(
page=page,
per_page=per_page
)
def create(self, user: User) -> User:
"""创建用户"""
self.db.session.add(user)
self.db.session.commit()
return user
def update(self, user: User) -> User:
"""更新用户"""
user.updated_at = datetime.utcnow()
self.db.session.commit()
return user
def delete(self, user_id: int) -> bool:
"""删除用户"""
user = self.find_by_id(user_id)
if not user:
return False
self.db.session.delete(user)
self.db.session.commit()
return True
def exists_by_username(self, username: str) -> bool:
"""检查用户名是否存在"""
return self.db.session.query(User).filter_by(
username=username
).first() is not None
def exists_by_email(self, email: str) -> bool:
"""检查邮箱是否存在"""
return self.db.session.query(User).filter_by(
email=email
).first() is not None
def count(self) -> int:
"""统计用户数量"""
return self.db.session.query(User).count()
def search(self, keyword: str, page: int = 1, per_page: int = 20):
"""搜索用户"""
return self.db.session.query(User).filter(
User.username.ilike(f'%{keyword}%') |
User.email.ilike(f'%{keyword}%')
).paginate(page=page, per_page=per_page)
> **补全的内容**:
> - 完整的 CRUD 方法
> - 查询方法
> - 存在性检查
> - 搜索功能
> - 分页支持