Skip to content

25.2 代码生成与补全

代码生成与补全概述

Claude Code 提供了强大的代码生成与补全功能,可以根据自然语言描述、代码片段、API 规范等生成高质量的代码,并智能补全未完成的代码。

代码生成

1. 从自然语言生成代码

示例:从自然语言生成代码

用户请求: "创建一个用户认证系统,支持注册、登录、密码重置功能"

Claude Code 生成的代码:

python
    python


    ````python

```python
    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
    from flask_mail import Mail, Message
    from datetime import datetime, timedelta
    import bcrypt
    import secrets

    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///auth.db'
    app.config['JWT_SECRET_KEY'] = 'your-secret-key'
    app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=24)
    app.config['MAIL_SERVER'] = 'smtp.gmail.com'
    app.config['MAIL_PORT'] = 587
    app.config['MAIL_USE_TLS'] = True
    app.config['MAIL_USERNAME'] = 'your-email@gmail.com'
    app.config['MAIL_PASSWORD'] = 'your-password'

    db = SQLAlchemy(app)
    jwt = JWTManager(app)
    mail = Mail(app)

    class User(db.Model):
    """用户模型"""
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password_hash = db.Column(db.String(128), nullable=False)
    is_active = db.Column(db.Boolean, default=True)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    reset_token = db.Column(db.String(100), unique=True)
    reset_token_expires = db.Column(db.DateTime)

    class PasswordReset(db.Model):
    """密码重置记录"""
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
    token = db.Column(db.String(100), unique=True, nullable=False)
    expires_at = db.Column(db.DateTime, nullable=False)
    used = db.Column(db.Boolean, default=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

    @app.route('/register', methods=['POST'])
    def register():
    """用户注册"""
    data = request.get_json()

     # 验证输入
    if not data.get('username') or not data.get('email') or not data.get('password'):
    return jsonify({'error': 'Missing required fields'}), 400

     # 检查用户名是否已存在
    if User.query.filter_by(username=data['username']).first():
    return jsonify({'error': 'Username already exists'}), 400

     # 检查邮箱是否已存在
    if User.query.filter_by(email=data['email']).first():
    return jsonify({'error': 'Email already exists'}), 400

     # 验证密码强度
    if len(data['password']) < 8:
    return jsonify({'error': 'Password must be at least 8 characters'}), 400

     # 创建用户
    user = User(
    username=data['username'],
    email=data['email'],
    password_hash=bcrypt.hashpw(
    data['password'].encode('utf-8'),
    bcrypt.gensalt()
    ).decode('utf-8')
    )

    db.session.add(user)
    db.session.commit()

     # 生成访问令牌
    access_token = create_access_token(identity=user.id)

    return jsonify({
    'message': 'User registered successfully',
    'user': {
    'id': user.id,
    'username': user.username,
    'email': user.email
    },
    'access_token': access_token
    }), 201

    @app.route('/login', methods=['POST'])
    def login():
    """用户登录"""
    data = request.get_json()

     # 验证输入
    if not data.get('username') or not data.get('password'):
    return jsonify({'error': 'Missing username or password'}), 400

     # 查找用户
    user = User.query.filter_by(username=data['username']).first()

    if not user or not bcrypt.checkpw(
    data['password'].encode('utf-8'),
    user.password_hash.encode('utf-8')
    ):
    return jsonify({'error': 'Invalid username or password'}), 401

    if not user.is_active:
    return jsonify({'error': 'Account is deactivated'}), 403

     # 生成访问令牌
    access_token = create_access_token(identity=user.id)

    return jsonify({
    'message': 'Login successful',
    'user': {
    'id': user.id,
    'username': user.username,
    'email': user.email
    },
    'access_token': access_token
    })

    @app.route('/me', methods=['GET'])
    @jwt_required()
    def get_current_user():
    """获取当前用户信息"""
    user_id = get_jwt_identity()
    user = User.query.get(user_id)

    if not user:
    return jsonify({'error': 'User not found'}), 404

    return jsonify({
    'id': user.id,
    'username': user.username,
    'email': user.email,
    'is_active': user.is_active,
    'created_at': user.created_at.isoformat()
    })

    @app.route('/request-password-reset', methods=['POST'])
    def request_password_reset():
    """请求密码重置"""
    data = request.get_json()

    if not data.get('email'):
    return jsonify({'error': 'Email is required'}), 400

    user = User.query.filter_by(email=data['email']).first()

    if not user:
     # 为了安全,不透露邮箱是否存在
    return jsonify({
    'message': 'If the email exists, a reset link has been sent'
    }), 200

     # 生成重置令牌
    reset_token = secrets.token_urlsafe(32)
    reset_token_expires = datetime.utcnow() + timedelta(hours=1)

     # 保存重置令牌
    password_reset = PasswordReset(
    user_id=user.id,
    token=reset_token,
    expires_at=reset_token_expires
    )

    db.session.add(password_reset)
    db.session.commit()

     # 发送重置邮件
    reset_link = f"http://localhost:3000/reset-password?token={reset_token}"

    msg = Message(
    'Password Reset Request',
    recipients=[user.email]
    )
    msg.body = f'''
    Hello {user.username},

    You have requested to reset your password. Click the link below to reset your password:

    {reset_link}

    This link will expire in 1 hour.

    If you did not request this password reset, please ignore this email.
    '''

    mail.send(msg)

    return jsonify({
    'message': 'If the email exists, a reset link has been sent'
    }), 200

    @app.route('/reset-password', methods=['POST'])
    def reset_password():
    """重置密码"""
    data = request.get_json()

    if not data.get('token') or not data.get('password'):
    return jsonify({'error': 'Token and password are required'}), 400

     # 查找重置记录
    password_reset = PasswordReset.query.filter_by(
    token=data['token'],
    used=False
    ).first()

    if not password_reset:
    return jsonify({'error': 'Invalid or expired reset token'}), 400

    if password_reset.expires_at < datetime.utcnow():
    return jsonify({'error': 'Reset token has expired'}), 400

     # 验证密码强度
    if len(data['password']) < 8:
    return jsonify({'error': 'Password must be at least 8 characters'}), 400

     # 更新密码
    user = User.query.get(password_reset.user_id)
    user.password_hash = bcrypt.hashpw(
    data['password'].encode('utf-8'),
    bcrypt.gensalt()
    ).decode('utf-8')

     # 标记重置令牌为已使用
    password_reset.used = True

    db.session.commit()

    return jsonify({'message': 'Password reset successful'}), 200

    @app.route('/change-password', methods=['POST'])
    @jwt_required()
    def change_password():
    """修改密码"""
    data = request.get_json()
    user_id = get_jwt_identity()

    if not data.get('current_password') or not data.get('new_password'):
    return jsonify({'error': 'Current password and new password are required'}), 400

    user = User.query.get(user_id)

     # 验证当前密码
    if not bcrypt.checkpw(
    data['current_password'].encode('utf-8'),
    user.password_hash.encode('utf-8')
    ):
    return jsonify({'error': 'Current password is incorrect'}), 400

     # 验证新密码强度
    if len(data['new_password']) < 8:
    return jsonify({'error': 'New password must be at least 8 characters'}), 400

     # 更新密码
    user.password_hash = bcrypt.hashpw(
    data['new_password'].encode('utf-8'),
    bcrypt.gensalt()
    ).decode('utf-8')

    db.session.commit()

    return jsonify({'message': 'Password changed successfully'}), 200

    if __name__ == '__main__':
    with app.app_context():
    db.create_all()
    app.run(debug=True)

    ```> **生成的代码包含**:

    > - 用户注册功能
    > - 用户登录功能
    > - JWT 认证
    > - 密码重置请求
    > - 密码重置
    > - 修改密码
    > - 邮件发送
    > - 数据验证
    > - 错误处理

    ```
    ### 2. 从 API 规范生成代码

    # 示例:从 OpenAPI 规范生成代码
    用户请求:
    "根据这个 OpenAPI 规范生成 API 代码"
    OpenAPI 规范:

    ````yaml

    `yaml

    openapi: 3.0.0
    info:
    title: Product API
    version: 1.0.0

    paths:
    /products:
    get:
    summary: Get all products
    parameters:

     - name: page

    in: query
    schema:
    type: integer

     - name: per_page

    in: query
    schema:
    type: integer
    responses:
    '200':
    description: Success
    content:
    application/json:
    schema:
    type: object
    properties:
    products:
    type: array
    items:
    $ref: '#/components/schemas/Product'
    total:
    type: integer
    page:
    type: integer

    post:
    summary: Create a product
    requestBody:
    required: true
    content:
    application/json:
    schema:
    $ref: '#/components/schemas/ProductCreate'
    responses:
    '201':
    description: Created
    content:
    application/json:
    schema:
    $ref: '#/components/schemas/Product'

    /products/{product_id}:
    get:
    summary: Get a product
    parameters:

     - name: product_id

    in: path
    required: true
    schema:
    type: integer
    responses:
    '200':
    description: Success
    content:
    application/json:
    schema:
    $ref: '#/components/schemas/Product'

    put:
    summary: Update a product
    parameters:

     - name: product_id

    in: path
    required: true
    schema:
    type: integer
    requestBody:
    required: true
    content:
    application/json:
    schema:
    $ref: '#/components/schemas/ProductUpdate'
    responses:
    '200':
    description: Success
    content:
    application/json:
    schema:
    $ref: '#/components/schemas/Product'

    delete:
    summary: Delete a product
    parameters:

     - name: product_id

    in: path
    required: true
    schema:
    type: integer
    responses:
    '204':
    description: No content

    components:
    schemas:
    Product:
    type: object
    properties:
    id:
    type: integer
    name:
    type: string
    description:
    type: string
    price:
    type: number
    stock:
    type: integer
    category:
    type: string
    created_at:
    type: string
    format: date-time
    updated_at:
    type: string
    format: date-time

    ProductCreate:
    type: object
    required:

     - name
     - price
     - stock
     - category

    properties:
    name:
    type: string
    description:
    type: string
    price:
    type: number
    stock:
    type: integer
    category:
    type: string

    ProductUpdate:
    type: object
    properties:
    name:
    type: string
    description:
    type: string
    price:
    type: number
    stock:
    type: integer
    category:
    type: string

    ```> Claude Code 生成的代码:

    ```

    python

    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    from datetime import datetime
    from typing import Optional

    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///products.db'

    db = SQLAlchemy(app)

    class Product(db.Model):
        """商品模型"""
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(200), nullable=False)
        description = db.Column(db.Text)
        price = db.Column(db.Float, nullable=False)
        stock = db.Column(db.Integer, nullable=False)
        category = db.Column(db.String(100), nullable=False)
        created_at = db.Column(db.DateTime, default=datetime.utcnow)
        updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

        def to_dict(self):
            """转换为字典"""
            return {
                'id': self.id,
                'name': self.name,
                'description': self.description,
                'price': self.price,
                'stock': self.stock,
                'category': self.category,
                'created_at': self.created_at.isoformat(),
                'updated_at': self.updated_at.isoformat()
            }

    @app.route('/products', methods=['GET'])
    def get_products():
        """获取所有商品"""
        page = request.args.get('page', 1, type=int)
        per_page = request.args.get('per_page', 20, type=int)

        pagination = Product.query.paginate(page=page, per_page=per_page)

        return jsonify({
            'products': [product.to_dict() for product in pagination.items],
            'total': pagination.total,
            'page': page
        })

    @app.route('/products', methods=['POST'])
    def create_product():
        """创建商品"""
        data = request.get_json()

        # 验证必填字段

        required_fields = ['name', 'price', 'stock', 'category']
        for field in required_fields:
            if field not in data:
                return jsonify({'error': f'Missing required field: {field}'}), 400

        # 验证数据类型

        if not isinstance(data['price'], (int, float)) or data['price'] < 0:
            return jsonify({'error': 'Price must be a positive number'}), 400

        if not isinstance(data['stock'], int) or data['stock'] < 0:
            return jsonify({'error': 'Stock must be a non-negative integer'}), 400

        # 创建商品

        product = Product(
            name=data['name'],
            description=data.get('description'),
            price=data['price'],
            stock=data['stock'],
            category=data['category']
        )

        db.session.add(product)
        db.session.commit()

        return jsonify(product.to_dict()), 201

    @app.route('/products/<int:product_id>', methods=['GET'])
    def get_product(product_id):
        """获取商品详情"""
        product = Product.query.get_or_404(product_id)
        return jsonify(product.to_dict())

    @app.route('/products/<int:product_id>', methods=['PUT'])
    def update_product(product_id):
        """更新商品"""
        product = Product.query.get_or_404(product_id)
        data = request.get_json()

        # 更新字段

        if 'name' in data:
            product.name = data['name']

        if 'description' in data:
            product.description = data['description']

        if 'price' in data:
            if not isinstance(data['price'], (int, float)) or data['price'] < 0:
                return jsonify({'error': 'Price must be a positive number'}), 400
            product.price = data['price']

        if 'stock' in data:
            if not isinstance(data['stock'], int) or data['stock'] < 0:
                return jsonify({'error': 'Stock must be a non-negative integer'}), 400
            product.stock = data['stock']

        if 'category' in data:
            product.category = data['category']

        product.updated_at = datetime.utcnow()
        db.session.commit()

        return jsonify(product.to_dict())

    @app.route('/products/<int:product_id>', methods=['DELETE'])
    def delete_product(product_id):
        """删除商品"""
        product = Product.query.get_or_404(product_id)
        db.session.delete(product)
        db.session.commit()

        return '', 204

    if __name__ == '__main__':
        with app.app_context():
            db.create_all()
        app.run(debug=True)

    > **生成的代码包含**:
    > - 完整的 CRUD 操作
    > - 数据验证
    > - 分页功能
    > - 错误处理
    > - 符合 OpenAPI 规范

    ## 代码补全

    ### 1. 智能方法补全

    ```
    python

    # 示例:智能方法补全

    用户输入:

    ```python

    ```python

    class OrderService:
        def __init__(self, db):
            self.db = db

        def create_order(self, user_id, items):
            """创建订单"""
            pass

    Claude Code 自动补全:

    ````python

    ````python

    class OrderService:
    def __init__(self, db):
    self.db = db

    def create_order(self, user_id, items):
    """
    创建订单

    参数:
    user_id: 用户 ID
    items: 商品列表 [{'product_id': 1, 'quantity': 2}, ...]

    返回:
    订单对象
    """
     # 验证用户
    user = self.db.session.query(User).filter_by(id=user_id).first()
    if not user:
    raise UserNotFoundError(f"User with id {user_id} not found")

     # 验证商品
    product_ids = [item['product_id'] for item in items]
    products = self.db.session.query(Product).filter(
    Product.id.in_(product_ids)
    ).all()

    product_map = {product.id: product for product in products}

    for item in items:
    if item['product_id'] not in product_map:
    raise ProductNotFoundError(
    f"Product with id {item['product_id']} not found"
    )

    product = product_map[item['product_id']]
    if product.stock < item['quantity']:
    raise InsufficientStockError(
    f"Insufficient stock for product {product.name}"
    )

     # 计算总金额
    total_amount = sum(
    product_map[item['product_id']].price * item['quantity']
    for item in items
    )

     # 创建订单
    order = Order(
    user_id=user_id,
    total_amount=total_amount,
    status='pending'
    )

    self.db.session.add(order)
    self.db.session.flush()

     # 创建订单项
    for item in items:
    product = product_map[item['product_id']]
    order_item = OrderItem(
    order_id=order.id,
    product_id=item['product_id'],
    quantity=item['quantity'],
    price=product.price
    )
    self.db.session.add(order_item)

     # 扣减库存
    product.stock -= item['quantity']

    self.db.session.commit()

    return order

    def get_order(self, order_id):
    """获取订单详情"""
    order = self.db.session.query(Order).filter_by(id=order_id).first()
    if not order:
    raise OrderNotFoundError(f"Order with id {order_id} not found")
    return order

    def update_order_status(self, order_id, status):
    """更新订单状态"""
    order = self.get_order(order_id)
    order.status = status
    order.updated_at = datetime.utcnow()
    self.db.session.commit()
    return order

    def cancel_order(self, order_id):
    """取消订单"""
    order = self.get_order(order_id)

    if order.status in ['paid', 'shipped', 'completed']:
    raise InvalidOrderStatusError(
    f"Cannot cancel order with status {order.status}"
    )

     # 恢复库存
    for item in order.items:
    product = self.db.session.query(Product).filter_by(
    id=item.product_id
    ).first()
    if product:
    product.stock += item.quantity

    order.status = 'cancelled'
    order.updated_at = datetime.utcnow()
    self.db.session.commit()

    return order

    def get_user_orders(self, user_id, page=1, per_page=20):
    """获取用户订单列表"""
    pagination = self.db.session.query(Order).filter_by(
    user_id=user_id
    ).order_by(
    Order.created_at.desc()
    ).paginate(page=page, per_page=per_page)

    return pagination

    ```> **补全的内容**:

    > - 完整的方法实现
    > - 数据验证
    > - 错误处理
    > - 库存管理
    > - 相关方法补全

    ```
    ### 2. 智能类补全

    # 示例:智能类补全
    用户输入:

    ````python

    `python

    class UserRepository:
    def __init__(self, db):
    self.db = db

    ```> Claude Code 自动补全:

    ```

    python

    from typing import List, Optional
    from datetime import datetime

    class UserRepository:
        """用户仓储类"""

        def __init__(self, db):
            self.db = db

        def find_by_id(self, user_id: int) -> Optional[User]:
            """根据 ID 查找用户"""
            return self.db.session.query(User).filter_by(id=user_id).first()

        def find_by_username(self, username: str) -> Optional[User]:
            """根据用户名查找用户"""
            return self.db.session.query(User).filter_by(username=username).first()

        def find_by_email(self, email: str) -> Optional[User]:
            """根据邮箱查找用户"""
            return self.db.session.query(User).filter_by(email=email).first()

        def find_all(self, page: int = 1, per_page: int = 20):
            """查找所有用户"""
            return self.db.session.query(User).paginate(
                page=page,
                per_page=per_page
            )

        def create(self, user: User) -> User:
            """创建用户"""
            self.db.session.add(user)
            self.db.session.commit()
            return user

        def update(self, user: User) -> User:
            """更新用户"""
            user.updated_at = datetime.utcnow()
            self.db.session.commit()
            return user

        def delete(self, user_id: int) -> bool:
            """删除用户"""
            user = self.find_by_id(user_id)
            if not user:
                return False

            self.db.session.delete(user)
            self.db.session.commit()
            return True

        def exists_by_username(self, username: str) -> bool:
            """检查用户名是否存在"""
            return self.db.session.query(User).filter_by(
                username=username
            ).first() is not None

        def exists_by_email(self, email: str) -> bool:
            """检查邮箱是否存在"""
            return self.db.session.query(User).filter_by(
                email=email
            ).first() is not None

        def count(self) -> int:
            """统计用户数量"""
            return self.db.session.query(User).count()

        def search(self, keyword: str, page: int = 1, per_page: int = 20):
            """搜索用户"""
            return self.db.session.query(User).filter(
                User.username.ilike(f'%{keyword}%') |
                User.email.ilike(f'%{keyword}%')
            ).paginate(page=page, per_page=per_page)

    > **补全的内容**:
    > - 完整的 CRUD 方法
    > - 查询方法
    > - 存在性检查
    > - 搜索功能
    > - 分页支持

基于 MIT 许可发布 | 永久导航